testcases.py (13907B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 6 import os 7 import re 8 import sys 9 import time 10 import unittest 11 import warnings 12 import weakref 13 from unittest.case import SkipTest 14 15 from marionette_driver.errors import TimeoutException, UnresponsiveInstanceException 16 from mozfile import load_source 17 from mozlog import get_default_logger 18 19 20 # With Python 3 both expectedFailure and unexpectedSuccess are 21 # available in unittest/case.py but won't work here because both 22 # do not inherit from BaseException. And that's currently needed 23 # in our custom test status handling in `run()`. 24 class expectedFailure(Exception): 25 """ 26 Raise this when a test is expected to fail. 27 28 This is an implementation detail. 29 """ 30 31 def __init__(self, exc_info): 32 super().__init__() 33 self.exc_info = exc_info 34 35 36 class unexpectedSuccess(Exception): 37 """ 38 The test was supposed to fail, but it didn't! 39 """ 40 41 pass 42 43 44 def _wraps_parameterized(func, func_suffix, args, kwargs): 45 """Internal: Decorator used in class MetaParameterized.""" 46 47 def wrapper(self): 48 return func(self, *args, **kwargs) 49 50 wrapper.__name__ = func.__name__ + "_" + str(func_suffix) 51 wrapper.__doc__ = f"[{func_suffix}] {func.__doc__}" 52 return wrapper 53 54 55 class MetaParameterized(type): 56 """ 57 A metaclass that allow a class to use decorators. 58 59 It can be used like :func:`parameterized` 60 or :func:`with_parameters` to generate new methods. 61 """ 62 63 RE_ESCAPE_BAD_CHARS = re.compile(r"[\.\(\) -/]") 64 65 def __new__(cls, name, bases, attrs): 66 for k, v in list(attrs.items()): 67 if callable(v) and hasattr(v, "metaparameters"): 68 for func_suffix, args, kwargs in v.metaparameters: 69 func_suffix = cls.RE_ESCAPE_BAD_CHARS.sub("_", func_suffix) 70 wrapper = _wraps_parameterized(v, func_suffix, args, kwargs) 71 if wrapper.__name__ in attrs: 72 raise KeyError( 73 f"{wrapper.__name__} is already a defined method on {name}" 74 ) 75 attrs[wrapper.__name__] = wrapper 76 del attrs[k] 77 78 return type.__new__(cls, name, bases, attrs) 79 80 81 class CommonTestCase(unittest.TestCase, metaclass=MetaParameterized): 82 match_re = None 83 failureException = AssertionError 84 pydebugger = None 85 86 def __init__(self, methodName, marionette_weakref, fixtures, **kwargs): 87 super().__init__(methodName) 88 self.methodName = methodName 89 90 self._marionette_weakref = marionette_weakref 91 self.fixtures = fixtures 92 93 self.duration = 0 94 self.start_time = 0 95 self.expected = kwargs.pop("expected", "pass") 96 self.logger = get_default_logger() 97 98 def _enter_pm(self): 99 if self.pydebugger: 100 self.pydebugger.post_mortem(sys.exc_info()[2]) 101 102 def _addSkip(self, result, reason): 103 addSkip = getattr(result, "addSkip", None) 104 if addSkip is not None: 105 addSkip(self, reason) 106 else: 107 warnings.warn( 108 "TestResult has no addSkip method, skips not reported", 109 RuntimeWarning, 110 2, 111 ) 112 result.addSuccess(self) 113 114 def assertRaisesRegxp( 115 self, expected_exception, expected_regexp, callable_obj=None, *args, **kwargs 116 ): 117 return self.assertRaisesRegex( 118 expected_exception, 119 expected_regexp, 120 callable_obj=None, 121 *args, 122 **kwargs, 123 ) 124 125 def run(self, result=None): 126 # Bug 967566 suggests refactoring run, which would hopefully 127 # mean getting rid of this inner function, which only sits 128 # here to reduce code duplication: 129 def expected_failure(result, exc_info): 130 addExpectedFailure = getattr(result, "addExpectedFailure", None) 131 if addExpectedFailure is not None: 132 addExpectedFailure(self, exc_info) 133 else: 134 warnings.warn( 135 "TestResult has no addExpectedFailure method, reporting as passes", 136 RuntimeWarning, 137 ) 138 result.addSuccess(self) 139 140 self.start_time = time.time() 141 orig_result = result 142 if result is None: 143 result = self.defaultTestResult() 144 startTestRun = getattr(result, "startTestRun", None) 145 if startTestRun is not None: 146 startTestRun() 147 148 result.startTest(self) 149 150 testMethod = getattr(self, self._testMethodName) 151 if getattr(self.__class__, "__unittest_skip__", False) or getattr( 152 testMethod, "__unittest_skip__", False 153 ): 154 # If the class or method was skipped. 155 try: 156 skip_why = getattr( 157 self.__class__, "__unittest_skip_why__", "" 158 ) or getattr(testMethod, "__unittest_skip_why__", "") 159 self._addSkip(result, skip_why) 160 finally: 161 result.stopTest(self) 162 self.stop_time = time.time() 163 return 164 try: 165 success = False 166 try: 167 if self.expected == "fail": 168 try: 169 self.setUp() 170 except Exception: 171 raise expectedFailure(sys.exc_info()) 172 else: 173 self.setUp() 174 except SkipTest as e: 175 self._addSkip(result, str(e)) 176 except (KeyboardInterrupt, UnresponsiveInstanceException): 177 raise 178 except expectedFailure as e: 179 expected_failure(result, e.exc_info) 180 except Exception: 181 self._enter_pm() 182 result.addError(self, sys.exc_info()) 183 else: 184 try: 185 if self.expected == "fail": 186 try: 187 testMethod() 188 except Exception: 189 raise expectedFailure(sys.exc_info()) 190 raise unexpectedSuccess 191 else: 192 testMethod() 193 except self.failureException: 194 self._enter_pm() 195 result.addFailure(self, sys.exc_info()) 196 except (KeyboardInterrupt, UnresponsiveInstanceException): 197 raise 198 except expectedFailure as e: 199 expected_failure(result, e.exc_info) 200 except unexpectedSuccess: 201 addUnexpectedSuccess = getattr(result, "addUnexpectedSuccess", None) 202 if addUnexpectedSuccess is not None: 203 addUnexpectedSuccess(self) 204 else: 205 warnings.warn( 206 "TestResult has no addUnexpectedSuccess method, " 207 "reporting as failures", 208 RuntimeWarning, 209 ) 210 result.addFailure(self, sys.exc_info()) 211 except SkipTest as e: 212 self._addSkip(result, str(e)) 213 except Exception: 214 self._enter_pm() 215 result.addError(self, sys.exc_info()) 216 else: 217 success = True 218 try: 219 if self.expected == "fail": 220 try: 221 self.tearDown() 222 except Exception: 223 raise expectedFailure(sys.exc_info()) 224 else: 225 self.tearDown() 226 except (KeyboardInterrupt, UnresponsiveInstanceException): 227 raise 228 except expectedFailure as e: 229 expected_failure(result, e.exc_info) 230 except Exception: 231 self._enter_pm() 232 result.addError(self, sys.exc_info()) 233 success = False 234 # Here we could handle doCleanups() instead of calling cleanTest directly 235 self.cleanTest() 236 237 if success: 238 result.addSuccess(self) 239 240 finally: 241 result.stopTest(self) 242 if orig_result is None: 243 stopTestRun = getattr(result, "stopTestRun", None) 244 if stopTestRun is not None: 245 stopTestRun() 246 247 @classmethod 248 def match(cls, filename): 249 """Determine if the specified filename should be handled by this test class. 250 251 This is done by looking for a match for the filename using cls.match_re. 252 """ 253 if not cls.match_re: 254 return False 255 m = cls.match_re.match(filename) 256 return m is not None 257 258 @classmethod 259 def add_tests_to_suite( 260 cls, 261 mod_name, 262 filepath, 263 suite, 264 testloader, 265 marionette, 266 fixtures, 267 testvars, 268 **kwargs, 269 ): 270 """Add all the tests in the specified file to the specified suite.""" 271 raise NotImplementedError 272 273 @property 274 def test_name(self): 275 rel_path = None 276 if os.path.exists(self.filepath): 277 rel_path = self._fix_test_path(self.filepath) 278 279 return f"{rel_path} {self.__class__.__name__}.{self._testMethodName}" 280 281 def id(self): 282 # TBPL starring requires that the "test name" field of a failure message 283 # not differ over time. The test name to be used is passed to 284 # mozlog via the test id, so this is overriden to maintain 285 # consistency. 286 return self.test_name 287 288 def setUp(self): 289 # Convert the marionette weakref to an object, just for the 290 # duration of the test; this is deleted in tearDown() to prevent 291 # a persistent circular reference which in turn would prevent 292 # proper garbage collection. 293 self.start_time = time.time() 294 self.marionette = self._marionette_weakref() 295 if self.marionette.session is None: 296 self.marionette.start_session() 297 self.marionette.timeout.reset() 298 299 super().setUp() 300 301 def cleanTest(self): 302 self._delete_session() 303 304 def _delete_session(self): 305 if hasattr(self, "start_time"): 306 self.duration = time.time() - self.start_time 307 if self.marionette.session is not None: 308 try: 309 self.marionette.delete_session() 310 except OSError: 311 # Gecko has crashed? 312 pass 313 self.marionette = None 314 315 def _fix_test_path(self, path): 316 """Normalize a logged test path from the test package.""" 317 test_path_prefixes = [ 318 f"tests{os.path.sep}", 319 ] 320 321 path = os.path.relpath(path) 322 for prefix in test_path_prefixes: 323 if path.startswith(prefix): 324 path = path[len(prefix) :] 325 break 326 path = path.replace("\\", "/") 327 328 return path 329 330 331 class MarionetteTestCase(CommonTestCase): 332 match_re = re.compile(r"test_(.*)\.py$") 333 334 def __init__( 335 self, marionette_weakref, fixtures, methodName="runTest", filepath="", **kwargs 336 ): 337 self.filepath = filepath 338 self.testvars = kwargs.pop("testvars", None) 339 340 super().__init__( 341 methodName, 342 marionette_weakref=marionette_weakref, 343 fixtures=fixtures, 344 **kwargs, 345 ) 346 347 @classmethod 348 def add_tests_to_suite( 349 cls, 350 mod_name, 351 filepath, 352 suite, 353 testloader, 354 marionette, 355 fixtures, 356 testvars, 357 **kwargs, 358 ): 359 # since load_source caches modules, if a module is loaded with the same 360 # name as another one the module would just be reloaded. 361 # 362 # We may end up by finding too many test in a module then since reload() 363 # only update the module dict (so old keys are still there!) see 364 # https://docs.python.org/2/library/functions.html#reload 365 # 366 # we get rid of that by removing the module from sys.modules, so we 367 # ensure that it will be fully loaded by the imp.load_source call. 368 369 if mod_name in sys.modules: 370 del sys.modules[mod_name] 371 372 test_mod = load_source(mod_name, filepath) 373 374 for name in dir(test_mod): 375 obj = getattr(test_mod, name) 376 if isinstance(obj, type) and issubclass(obj, unittest.TestCase): 377 testnames = testloader.getTestCaseNames(obj) 378 for testname in testnames: 379 suite.addTest( 380 obj( 381 weakref.ref(marionette), 382 fixtures, 383 methodName=testname, 384 filepath=filepath, 385 testvars=testvars, 386 **kwargs, 387 ) 388 ) 389 390 def setUp(self): 391 super().setUp() 392 self.marionette.test_name = self.test_name 393 394 def tearDown(self): 395 # In the case no session is active (eg. the application was quit), start 396 # a new session for clean-up steps. 397 if not self.marionette.session: 398 self.marionette.start_session() 399 400 self.marionette.test_name = None 401 402 super().tearDown() 403 404 def wait_for_condition(self, method, timeout=30): 405 timeout = float(timeout) + time.time() 406 while time.time() < timeout: 407 value = method(self.marionette) 408 if value: 409 return value 410 time.sleep(0.5) 411 raise TimeoutException("wait_for_condition timed out")