tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_wait.py (10604B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 import sys
      6 import time
      7 
      8 from marionette_driver import errors, wait
      9 from marionette_driver.wait import Wait
     10 
     11 from marionette_harness import MarionetteTestCase
     12 
     13 
     14 class TickingClock(object):
     15    def __init__(self, incr=1):
     16        self.ticks = 0
     17        self.increment = incr
     18 
     19    def sleep(self, dur=None):
     20        dur = dur if dur is not None else self.increment
     21        self.ticks += dur
     22 
     23    @property
     24    def now(self):
     25        return self.ticks
     26 
     27 
     28 class SequenceClock(object):
     29    def __init__(self, times):
     30        self.times = times
     31        self.i = 0
     32 
     33    @property
     34    def now(self):
     35        if len(self.times) > self.i:
     36            self.i += 1
     37        return self.times[self.i - 1]
     38 
     39    def sleep(self, dur):
     40        pass
     41 
     42 
     43 class MockMarionette(object):
     44    def __init__(self):
     45        self.waited = 0
     46 
     47    def exception(self, e=None, wait=1):
     48        self.wait()
     49        if self.waited == wait:
     50            if e is None:
     51                e = Exception
     52            raise e
     53 
     54    def true(self, wait=1):
     55        self.wait()
     56        if self.waited == wait:
     57            return True
     58        return None
     59 
     60    def false(self, wait=1):
     61        self.wait()
     62        return False
     63 
     64    def none(self, wait=1):
     65        self.wait()
     66        return None
     67 
     68    def value(self, value, wait=1):
     69        self.wait()
     70        if self.waited == wait:
     71            return value
     72        return None
     73 
     74    def wait(self):
     75        self.waited += 1
     76 
     77 
     78 def at_third_attempt(clock, end):
     79    return clock.now == 2
     80 
     81 
     82 def now(clock, end):
     83    return True
     84 
     85 
     86 class SystemClockTest(MarionetteTestCase):
     87    def setUp(self):
     88        super(SystemClockTest, self).setUp()
     89        self.clock = wait.SystemClock()
     90 
     91    def test_construction_initializes_time(self):
     92        self.assertEqual(self.clock._time, time)
     93 
     94    def test_sleep(self):
     95        start = time.time()
     96        self.clock.sleep(0.1)
     97        end = time.time() - start
     98        self.assertGreater(end, 0)
     99 
    100    def test_time_now(self):
    101        self.assertIsNotNone(self.clock.now)
    102 
    103 
    104 class FormalWaitTest(MarionetteTestCase):
    105    def setUp(self):
    106        super(FormalWaitTest, self).setUp()
    107        self.m = MockMarionette()
    108        self.m.timeout = 123
    109 
    110    def test_construction_with_custom_timeout(self):
    111        wt = Wait(self.m, timeout=42)
    112        self.assertEqual(wt.timeout, 42)
    113 
    114    def test_construction_with_custom_interval(self):
    115        wt = Wait(self.m, interval=42)
    116        self.assertEqual(wt.interval, 42)
    117 
    118    def test_construction_with_custom_clock(self):
    119        c = TickingClock(1)
    120        wt = Wait(self.m, clock=c)
    121        self.assertEqual(wt.clock, c)
    122 
    123    def test_construction_with_custom_exception(self):
    124        wt = Wait(self.m, ignored_exceptions=Exception)
    125        self.assertIn(Exception, wt.exceptions)
    126        self.assertEqual(len(wt.exceptions), 1)
    127 
    128    def test_construction_with_custom_exception_list(self):
    129        exc = [Exception, ValueError]
    130        wt = Wait(self.m, ignored_exceptions=exc)
    131        for e in exc:
    132            self.assertIn(e, wt.exceptions)
    133        self.assertEqual(len(wt.exceptions), len(exc))
    134 
    135    def test_construction_with_custom_exception_tuple(self):
    136        exc = (Exception, ValueError)
    137        wt = Wait(self.m, ignored_exceptions=exc)
    138        for e in exc:
    139            self.assertIn(e, wt.exceptions)
    140        self.assertEqual(len(wt.exceptions), len(exc))
    141 
    142    def test_duplicate_exceptions(self):
    143        wt = Wait(self.m, ignored_exceptions=[Exception, Exception])
    144        self.assertIn(Exception, wt.exceptions)
    145        self.assertEqual(len(wt.exceptions), 1)
    146 
    147    def test_default_timeout(self):
    148        self.assertEqual(wait.DEFAULT_TIMEOUT, 5)
    149 
    150    def test_default_interval(self):
    151        self.assertEqual(wait.DEFAULT_INTERVAL, 0.1)
    152 
    153    def test_end_property(self):
    154        wt = Wait(self.m)
    155        self.assertIsNotNone(wt.end)
    156 
    157    def test_marionette_property(self):
    158        wt = Wait(self.m)
    159        self.assertEqual(wt.marionette, self.m)
    160 
    161    def test_clock_property(self):
    162        wt = Wait(self.m)
    163        self.assertIsInstance(wt.clock, wait.SystemClock)
    164 
    165    def test_timeout_uses_default_if_marionette_timeout_is_none(self):
    166        self.m.timeout = None
    167        wt = Wait(self.m)
    168        self.assertEqual(wt.timeout, wait.DEFAULT_TIMEOUT)
    169 
    170 
    171 class PredicatesTest(MarionetteTestCase):
    172    def test_until(self):
    173        c = wait.SystemClock()
    174        self.assertFalse(wait.until_pred(c, sys.maxsize))
    175        self.assertTrue(wait.until_pred(c, 0))
    176 
    177 
    178 class WaitUntilTest(MarionetteTestCase):
    179    def setUp(self):
    180        super(WaitUntilTest, self).setUp()
    181 
    182        self.m = MockMarionette()
    183        self.clock = TickingClock()
    184        self.wt = Wait(self.m, timeout=10, interval=1, clock=self.clock)
    185 
    186    def test_true(self):
    187        r = self.wt.until(lambda x: x.true())
    188        self.assertTrue(r)
    189        self.assertEqual(self.clock.ticks, 0)
    190 
    191    def test_true_within_timeout(self):
    192        r = self.wt.until(lambda x: x.true(wait=5))
    193        self.assertTrue(r)
    194        self.assertEqual(self.clock.ticks, 4)
    195 
    196    def test_timeout(self):
    197        with self.assertRaises(errors.TimeoutException):
    198            r = self.wt.until(lambda x: x.true(wait=15))
    199        self.assertEqual(self.clock.ticks, 10)
    200 
    201    def test_exception_raises_immediately(self):
    202        with self.assertRaises(TypeError):
    203            self.wt.until(lambda x: x.exception(e=TypeError))
    204        self.assertEqual(self.clock.ticks, 0)
    205 
    206    def test_ignored_exception(self):
    207        self.wt.exceptions = (TypeError,)
    208        with self.assertRaises(errors.TimeoutException):
    209            self.wt.until(lambda x: x.exception(e=TypeError))
    210 
    211    def test_ignored_exception_wrapped_in_timeoutexception(self):
    212        self.wt.exceptions = (TypeError,)
    213 
    214        exc = None
    215        try:
    216            self.wt.until(lambda x: x.exception(e=TypeError))
    217        except Exception as e:
    218            exc = e
    219 
    220        s = str(exc)
    221        self.assertIsNotNone(exc)
    222        self.assertIsInstance(exc, errors.TimeoutException)
    223        self.assertIn(", caused by {0!r}".format(TypeError), s)
    224        self.assertIn("self.wt.until(lambda x: x.exception(e=TypeError))", s)
    225 
    226    def test_ignored_exception_after_timeout_is_not_raised(self):
    227        with self.assertRaises(errors.TimeoutException):
    228            r = self.wt.until(lambda x: x.exception(wait=15))
    229        self.assertEqual(self.clock.ticks, 10)
    230 
    231    def test_keyboard_interrupt(self):
    232        with self.assertRaises(KeyboardInterrupt):
    233            self.wt.until(lambda x: x.exception(e=KeyboardInterrupt))
    234 
    235    def test_system_exit(self):
    236        with self.assertRaises(SystemExit):
    237            self.wt.until(lambda x: x.exception(SystemExit))
    238 
    239    def test_true_condition_returns_immediately(self):
    240        r = self.wt.until(lambda x: x.true())
    241        self.assertIsInstance(r, bool)
    242        self.assertTrue(r)
    243        self.assertEqual(self.clock.ticks, 0)
    244 
    245    def test_value(self):
    246        r = self.wt.until(lambda x: "foo")
    247        self.assertEqual(r, "foo")
    248        self.assertEqual(self.clock.ticks, 0)
    249 
    250    def test_custom_predicate(self):
    251        r = self.wt.until(lambda x: x.true(wait=2), is_true=at_third_attempt)
    252        self.assertTrue(r)
    253        self.assertEqual(self.clock.ticks, 1)
    254 
    255    def test_custom_predicate_times_out(self):
    256        with self.assertRaises(errors.TimeoutException):
    257            self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)
    258 
    259        self.assertEqual(self.clock.ticks, 2)
    260 
    261    def test_timeout_elapsed_duration(self):
    262        with self.assertRaisesRegex(
    263            errors.TimeoutException, "Timed out after 2.0 seconds"
    264        ):
    265            self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)
    266 
    267    def test_timeout_elapsed_rounding(self):
    268        wt = Wait(self.m, clock=SequenceClock([1, 0.01, 1]), timeout=0)
    269        with self.assertRaisesRegex(
    270            errors.TimeoutException, "Timed out after 1.0 seconds"
    271        ):
    272            wt.until(lambda x: x.true(), is_true=now)
    273 
    274    def test_timeout_elapsed_interval_by_delayed_condition_return(self):
    275        def callback(mn):
    276            self.clock.sleep(11)
    277            return mn.false()
    278 
    279        with self.assertRaisesRegex(
    280            errors.TimeoutException, "Timed out after 11.0 seconds"
    281        ):
    282            self.wt.until(callback)
    283        # With a delayed conditional return > timeout, only 1 iteration is
    284        # possible
    285        self.assertEqual(self.m.waited, 1)
    286 
    287    def test_timeout_with_delayed_condition_return(self):
    288        def callback(mn):
    289            self.clock.sleep(0.5)
    290            return mn.false()
    291 
    292        with self.assertRaisesRegex(
    293            errors.TimeoutException, "Timed out after 10.0 seconds"
    294        ):
    295            self.wt.until(callback)
    296        # With a delayed conditional return < interval, 10 iterations should be
    297        # possible
    298        self.assertEqual(self.m.waited, 10)
    299 
    300    def test_timeout_interval_shorter_than_delayed_condition_return(self):
    301        def callback(mn):
    302            self.clock.sleep(2)
    303            return mn.false()
    304 
    305        with self.assertRaisesRegex(
    306            errors.TimeoutException, "Timed out after 10.0 seconds"
    307        ):
    308            self.wt.until(callback)
    309        # With a delayed return of the conditional which takes twice that long than the interval,
    310        # half of the iterations should be possible
    311        self.assertEqual(self.m.waited, 5)
    312 
    313    def test_message(self):
    314        self.wt.exceptions = (TypeError,)
    315        exc = None
    316        try:
    317            self.wt.until(lambda x: x.exception(e=TypeError), message="hooba")
    318        except errors.TimeoutException as e:
    319            exc = e
    320 
    321        result = str(exc)
    322        self.assertIn("seconds with message: hooba, caused by", result)
    323 
    324    def test_no_message(self):
    325        self.wt.exceptions = (TypeError,)
    326        exc = None
    327        try:
    328            self.wt.until(lambda x: x.exception(e=TypeError), message="")
    329        except errors.TimeoutException as e:
    330            exc = e
    331 
    332        result = str(exc)
    333        self.assertIn("seconds, caused by", result)
    334 
    335    def test_message_has_none_as_its_value(self):
    336        self.wt.exceptions = (TypeError,)
    337        exc = None
    338        try:
    339            self.wt.until(False, None, None)
    340        except errors.TimeoutException as e:
    341            exc = e
    342 
    343        result = str(exc)
    344        self.assertNotIn("with message:", result)
    345        self.assertNotIn("secondsNone", result)