tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

testcases.py (13907B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 
      6 import os
      7 import re
      8 import sys
      9 import time
     10 import unittest
     11 import warnings
     12 import weakref
     13 from unittest.case import SkipTest
     14 
     15 from marionette_driver.errors import TimeoutException, UnresponsiveInstanceException
     16 from mozfile import load_source
     17 from mozlog import get_default_logger
     18 
     19 
     20 # With Python 3 both expectedFailure and unexpectedSuccess are
     21 # available in unittest/case.py but won't work here because both
     22 # do not inherit from BaseException. And that's currently needed
     23 # in our custom test status handling in `run()`.
     24 class expectedFailure(Exception):
     25    """
     26    Raise this when a test is expected to fail.
     27 
     28    This is an implementation detail.
     29    """
     30 
     31    def __init__(self, exc_info):
     32        super().__init__()
     33        self.exc_info = exc_info
     34 
     35 
     36 class unexpectedSuccess(Exception):
     37    """
     38    The test was supposed to fail, but it didn't!
     39    """
     40 
     41    pass
     42 
     43 
     44 def _wraps_parameterized(func, func_suffix, args, kwargs):
     45    """Internal: Decorator used in class MetaParameterized."""
     46 
     47    def wrapper(self):
     48        return func(self, *args, **kwargs)
     49 
     50    wrapper.__name__ = func.__name__ + "_" + str(func_suffix)
     51    wrapper.__doc__ = f"[{func_suffix}] {func.__doc__}"
     52    return wrapper
     53 
     54 
     55 class MetaParameterized(type):
     56    """
     57    A metaclass that allow a class to use decorators.
     58 
     59    It can be used like :func:`parameterized`
     60    or :func:`with_parameters` to generate new methods.
     61    """
     62 
     63    RE_ESCAPE_BAD_CHARS = re.compile(r"[\.\(\) -/]")
     64 
     65    def __new__(cls, name, bases, attrs):
     66        for k, v in list(attrs.items()):
     67            if callable(v) and hasattr(v, "metaparameters"):
     68                for func_suffix, args, kwargs in v.metaparameters:
     69                    func_suffix = cls.RE_ESCAPE_BAD_CHARS.sub("_", func_suffix)
     70                    wrapper = _wraps_parameterized(v, func_suffix, args, kwargs)
     71                    if wrapper.__name__ in attrs:
     72                        raise KeyError(
     73                            f"{wrapper.__name__} is already a defined method on {name}"
     74                        )
     75                    attrs[wrapper.__name__] = wrapper
     76                del attrs[k]
     77 
     78        return type.__new__(cls, name, bases, attrs)
     79 
     80 
     81 class CommonTestCase(unittest.TestCase, metaclass=MetaParameterized):
     82    match_re = None
     83    failureException = AssertionError
     84    pydebugger = None
     85 
     86    def __init__(self, methodName, marionette_weakref, fixtures, **kwargs):
     87        super().__init__(methodName)
     88        self.methodName = methodName
     89 
     90        self._marionette_weakref = marionette_weakref
     91        self.fixtures = fixtures
     92 
     93        self.duration = 0
     94        self.start_time = 0
     95        self.expected = kwargs.pop("expected", "pass")
     96        self.logger = get_default_logger()
     97 
     98    def _enter_pm(self):
     99        if self.pydebugger:
    100            self.pydebugger.post_mortem(sys.exc_info()[2])
    101 
    102    def _addSkip(self, result, reason):
    103        addSkip = getattr(result, "addSkip", None)
    104        if addSkip is not None:
    105            addSkip(self, reason)
    106        else:
    107            warnings.warn(
    108                "TestResult has no addSkip method, skips not reported",
    109                RuntimeWarning,
    110                2,
    111            )
    112            result.addSuccess(self)
    113 
    114    def assertRaisesRegxp(
    115        self, expected_exception, expected_regexp, callable_obj=None, *args, **kwargs
    116    ):
    117        return self.assertRaisesRegex(
    118            expected_exception,
    119            expected_regexp,
    120            callable_obj=None,
    121            *args,
    122            **kwargs,
    123        )
    124 
    125    def run(self, result=None):
    126        # Bug 967566 suggests refactoring run, which would hopefully
    127        # mean getting rid of this inner function, which only sits
    128        # here to reduce code duplication:
    129        def expected_failure(result, exc_info):
    130            addExpectedFailure = getattr(result, "addExpectedFailure", None)
    131            if addExpectedFailure is not None:
    132                addExpectedFailure(self, exc_info)
    133            else:
    134                warnings.warn(
    135                    "TestResult has no addExpectedFailure method, reporting as passes",
    136                    RuntimeWarning,
    137                )
    138                result.addSuccess(self)
    139 
    140        self.start_time = time.time()
    141        orig_result = result
    142        if result is None:
    143            result = self.defaultTestResult()
    144            startTestRun = getattr(result, "startTestRun", None)
    145            if startTestRun is not None:
    146                startTestRun()
    147 
    148        result.startTest(self)
    149 
    150        testMethod = getattr(self, self._testMethodName)
    151        if getattr(self.__class__, "__unittest_skip__", False) or getattr(
    152            testMethod, "__unittest_skip__", False
    153        ):
    154            # If the class or method was skipped.
    155            try:
    156                skip_why = getattr(
    157                    self.__class__, "__unittest_skip_why__", ""
    158                ) or getattr(testMethod, "__unittest_skip_why__", "")
    159                self._addSkip(result, skip_why)
    160            finally:
    161                result.stopTest(self)
    162            self.stop_time = time.time()
    163            return
    164        try:
    165            success = False
    166            try:
    167                if self.expected == "fail":
    168                    try:
    169                        self.setUp()
    170                    except Exception:
    171                        raise expectedFailure(sys.exc_info())
    172                else:
    173                    self.setUp()
    174            except SkipTest as e:
    175                self._addSkip(result, str(e))
    176            except (KeyboardInterrupt, UnresponsiveInstanceException):
    177                raise
    178            except expectedFailure as e:
    179                expected_failure(result, e.exc_info)
    180            except Exception:
    181                self._enter_pm()
    182                result.addError(self, sys.exc_info())
    183            else:
    184                try:
    185                    if self.expected == "fail":
    186                        try:
    187                            testMethod()
    188                        except Exception:
    189                            raise expectedFailure(sys.exc_info())
    190                        raise unexpectedSuccess
    191                    else:
    192                        testMethod()
    193                except self.failureException:
    194                    self._enter_pm()
    195                    result.addFailure(self, sys.exc_info())
    196                except (KeyboardInterrupt, UnresponsiveInstanceException):
    197                    raise
    198                except expectedFailure as e:
    199                    expected_failure(result, e.exc_info)
    200                except unexpectedSuccess:
    201                    addUnexpectedSuccess = getattr(result, "addUnexpectedSuccess", None)
    202                    if addUnexpectedSuccess is not None:
    203                        addUnexpectedSuccess(self)
    204                    else:
    205                        warnings.warn(
    206                            "TestResult has no addUnexpectedSuccess method, "
    207                            "reporting as failures",
    208                            RuntimeWarning,
    209                        )
    210                        result.addFailure(self, sys.exc_info())
    211                except SkipTest as e:
    212                    self._addSkip(result, str(e))
    213                except Exception:
    214                    self._enter_pm()
    215                    result.addError(self, sys.exc_info())
    216                else:
    217                    success = True
    218                try:
    219                    if self.expected == "fail":
    220                        try:
    221                            self.tearDown()
    222                        except Exception:
    223                            raise expectedFailure(sys.exc_info())
    224                    else:
    225                        self.tearDown()
    226                except (KeyboardInterrupt, UnresponsiveInstanceException):
    227                    raise
    228                except expectedFailure as e:
    229                    expected_failure(result, e.exc_info)
    230                except Exception:
    231                    self._enter_pm()
    232                    result.addError(self, sys.exc_info())
    233                    success = False
    234            # Here we could handle doCleanups() instead of calling cleanTest directly
    235            self.cleanTest()
    236 
    237            if success:
    238                result.addSuccess(self)
    239 
    240        finally:
    241            result.stopTest(self)
    242            if orig_result is None:
    243                stopTestRun = getattr(result, "stopTestRun", None)
    244                if stopTestRun is not None:
    245                    stopTestRun()
    246 
    247    @classmethod
    248    def match(cls, filename):
    249        """Determine if the specified filename should be handled by this test class.
    250 
    251        This is done by looking for a match for the filename using cls.match_re.
    252        """
    253        if not cls.match_re:
    254            return False
    255        m = cls.match_re.match(filename)
    256        return m is not None
    257 
    258    @classmethod
    259    def add_tests_to_suite(
    260        cls,
    261        mod_name,
    262        filepath,
    263        suite,
    264        testloader,
    265        marionette,
    266        fixtures,
    267        testvars,
    268        **kwargs,
    269    ):
    270        """Add all the tests in the specified file to the specified suite."""
    271        raise NotImplementedError
    272 
    273    @property
    274    def test_name(self):
    275        rel_path = None
    276        if os.path.exists(self.filepath):
    277            rel_path = self._fix_test_path(self.filepath)
    278 
    279        return f"{rel_path} {self.__class__.__name__}.{self._testMethodName}"
    280 
    281    def id(self):
    282        # TBPL starring requires that the "test name" field of a failure message
    283        # not differ over time. The test name to be used is passed to
    284        # mozlog via the test id, so this is overriden to maintain
    285        # consistency.
    286        return self.test_name
    287 
    288    def setUp(self):
    289        # Convert the marionette weakref to an object, just for the
    290        # duration of the test; this is deleted in tearDown() to prevent
    291        # a persistent circular reference which in turn would prevent
    292        # proper garbage collection.
    293        self.start_time = time.time()
    294        self.marionette = self._marionette_weakref()
    295        if self.marionette.session is None:
    296            self.marionette.start_session()
    297        self.marionette.timeout.reset()
    298 
    299        super().setUp()
    300 
    301    def cleanTest(self):
    302        self._delete_session()
    303 
    304    def _delete_session(self):
    305        if hasattr(self, "start_time"):
    306            self.duration = time.time() - self.start_time
    307        if self.marionette.session is not None:
    308            try:
    309                self.marionette.delete_session()
    310            except OSError:
    311                # Gecko has crashed?
    312                pass
    313        self.marionette = None
    314 
    315    def _fix_test_path(self, path):
    316        """Normalize a logged test path from the test package."""
    317        test_path_prefixes = [
    318            f"tests{os.path.sep}",
    319        ]
    320 
    321        path = os.path.relpath(path)
    322        for prefix in test_path_prefixes:
    323            if path.startswith(prefix):
    324                path = path[len(prefix) :]
    325                break
    326        path = path.replace("\\", "/")
    327 
    328        return path
    329 
    330 
    331 class MarionetteTestCase(CommonTestCase):
    332    match_re = re.compile(r"test_(.*)\.py$")
    333 
    334    def __init__(
    335        self, marionette_weakref, fixtures, methodName="runTest", filepath="", **kwargs
    336    ):
    337        self.filepath = filepath
    338        self.testvars = kwargs.pop("testvars", None)
    339 
    340        super().__init__(
    341            methodName,
    342            marionette_weakref=marionette_weakref,
    343            fixtures=fixtures,
    344            **kwargs,
    345        )
    346 
    347    @classmethod
    348    def add_tests_to_suite(
    349        cls,
    350        mod_name,
    351        filepath,
    352        suite,
    353        testloader,
    354        marionette,
    355        fixtures,
    356        testvars,
    357        **kwargs,
    358    ):
    359        # since load_source caches modules, if a module is loaded with the same
    360        # name as another one the module would just be reloaded.
    361        #
    362        # We may end up by finding too many test in a module then since reload()
    363        # only update the module dict (so old keys are still there!) see
    364        # https://docs.python.org/2/library/functions.html#reload
    365        #
    366        # we get rid of that by removing the module from sys.modules, so we
    367        # ensure that it will be fully loaded by the imp.load_source call.
    368 
    369        if mod_name in sys.modules:
    370            del sys.modules[mod_name]
    371 
    372        test_mod = load_source(mod_name, filepath)
    373 
    374        for name in dir(test_mod):
    375            obj = getattr(test_mod, name)
    376            if isinstance(obj, type) and issubclass(obj, unittest.TestCase):
    377                testnames = testloader.getTestCaseNames(obj)
    378                for testname in testnames:
    379                    suite.addTest(
    380                        obj(
    381                            weakref.ref(marionette),
    382                            fixtures,
    383                            methodName=testname,
    384                            filepath=filepath,
    385                            testvars=testvars,
    386                            **kwargs,
    387                        )
    388                    )
    389 
    390    def setUp(self):
    391        super().setUp()
    392        self.marionette.test_name = self.test_name
    393 
    394    def tearDown(self):
    395        # In the case no session is active (eg. the application was quit), start
    396        # a new session for clean-up steps.
    397        if not self.marionette.session:
    398            self.marionette.start_session()
    399 
    400        self.marionette.test_name = None
    401 
    402        super().tearDown()
    403 
    404    def wait_for_condition(self, method, timeout=30):
    405        timeout = float(timeout) + time.time()
    406        while time.time() < timeout:
    407            value = method(self.marionette)
    408            if value:
    409                return value
    410            time.sleep(0.5)
    411        raise TimeoutException("wait_for_condition timed out")