tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_dispatch.py (13266B)


      1 #!/usr/bin/env python
      2 #
      3 # Copyright 2011, Google Inc.
      4 # All rights reserved.
      5 #
      6 # Redistribution and use in source and binary forms, with or without
      7 # modification, are permitted provided that the following conditions are
      8 # met:
      9 #
     10 #     * Redistributions of source code must retain the above copyright
     11 # notice, this list of conditions and the following disclaimer.
     12 #     * Redistributions in binary form must reproduce the above
     13 # copyright notice, this list of conditions and the following disclaimer
     14 # in the documentation and/or other materials provided with the
     15 # distribution.
     16 #     * Neither the name of Google Inc. nor the names of its
     17 # contributors may be used to endorse or promote products derived from
     18 # this software without specific prior written permission.
     19 #
     20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31 """Tests for dispatch module."""
     32 
     33 from __future__ import absolute_import
     34 
     35 import os
     36 import unittest
     37 
     38 from six.moves import zip
     39 
     40 import set_sys_path  # Update sys.path to locate pywebsocket3 module.
     41 from pywebsocket3 import dispatch, handshake
     42 from test import mock
     43 
     44 
     45 _TEST_HANDLERS_DIR = os.path.join(os.path.dirname(__file__), 'testdata',
     46                                  'handlers')
     47 
     48 _TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub')
     49 
     50 
     51 class DispatcherTest(unittest.TestCase):
     52    """A unittest for dispatch module."""
     53    def test_normalize_path(self):
     54        self.assertEqual(
     55            os.path.abspath('/a/b').replace('\\', '/'),
     56            dispatch._normalize_path('/a/b'))
     57        self.assertEqual(
     58            os.path.abspath('/a/b').replace('\\', '/'),
     59            dispatch._normalize_path('\\a\\b'))
     60        self.assertEqual(
     61            os.path.abspath('/a/b').replace('\\', '/'),
     62            dispatch._normalize_path('/a/c/../b'))
     63        self.assertEqual(
     64            os.path.abspath('abc').replace('\\', '/'),
     65            dispatch._normalize_path('abc'))
     66 
     67    def test_converter(self):
     68        converter = dispatch._create_path_to_resource_converter('/a/b')
     69        # Python built by MSC inserts a drive name like 'C:\' via realpath().
     70        # Converter Generator expands provided path using realpath() and uses
     71        # the path including a drive name to verify the prefix.
     72        os_root = os.path.realpath('/')
     73        self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
     74        self.assertEqual('/c/h', converter(os_root + 'a/b/c/h_wsh.py'))
     75        self.assertEqual(None, converter(os_root + 'a/b/h.py'))
     76        self.assertEqual(None, converter('a/b/h_wsh.py'))
     77 
     78        converter = dispatch._create_path_to_resource_converter('a/b')
     79        self.assertEqual('/h',
     80                         converter(dispatch._normalize_path('a/b/h_wsh.py')))
     81 
     82        converter = dispatch._create_path_to_resource_converter('/a/b///')
     83        self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
     84        self.assertEqual(
     85            '/h', converter(dispatch._normalize_path('/a/b/../b/h_wsh.py')))
     86 
     87        converter = dispatch._create_path_to_resource_converter(
     88            '/a/../a/b/../b/')
     89        self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
     90 
     91        converter = dispatch._create_path_to_resource_converter(r'\a\b')
     92        self.assertEqual('/h', converter(os_root + r'a\b\h_wsh.py'))
     93        self.assertEqual('/h', converter(os_root + r'a/b/h_wsh.py'))
     94 
     95    def test_enumerate_handler_file_paths(self):
     96        paths = list(
     97            dispatch._enumerate_handler_file_paths(_TEST_HANDLERS_DIR))
     98        paths.sort()
     99        self.assertEqual(8, len(paths))
    100        expected_paths = [
    101            os.path.join(_TEST_HANDLERS_DIR, 'abort_by_user_wsh.py'),
    102            os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
    103            os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
    104            os.path.join(_TEST_HANDLERS_DIR, 'sub',
    105                         'exception_in_transfer_wsh.py'),
    106            os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
    107            os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
    108            os.path.join(_TEST_HANDLERS_DIR, 'sub',
    109                         'wrong_handshake_sig_wsh.py'),
    110            os.path.join(_TEST_HANDLERS_DIR, 'sub',
    111                         'wrong_transfer_sig_wsh.py'),
    112        ]
    113        for expected, actual in zip(expected_paths, paths):
    114            self.assertEqual(expected, actual)
    115 
    116    def test_source_handler_file(self):
    117        self.assertRaises(dispatch.DispatchException,
    118                          dispatch._source_handler_file, '')
    119        self.assertRaises(dispatch.DispatchException,
    120                          dispatch._source_handler_file, 'def')
    121        self.assertRaises(dispatch.DispatchException,
    122                          dispatch._source_handler_file, '1/0')
    123        self.assertTrue(
    124            dispatch._source_handler_file(
    125                'def web_socket_do_extra_handshake(request):pass\n'
    126                'def web_socket_transfer_data(request):pass\n'))
    127 
    128    def test_source_warnings(self):
    129        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    130        warnings = dispatcher.source_warnings()
    131        warnings.sort()
    132        expected_warnings = [
    133            (os.path.realpath(os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'))
    134             + ': web_socket_do_extra_handshake is not defined.'),
    135            (os.path.realpath(
    136                os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'))
    137             + ': web_socket_do_extra_handshake is not callable.'),
    138            (os.path.realpath(
    139                os.path.join(_TEST_HANDLERS_DIR, 'sub',
    140                             'wrong_handshake_sig_wsh.py')) +
    141             ': web_socket_do_extra_handshake is not defined.'),
    142            (os.path.realpath(
    143                os.path.join(_TEST_HANDLERS_DIR, 'sub',
    144                             'wrong_transfer_sig_wsh.py')) +
    145             ': web_socket_transfer_data is not defined.'),
    146        ]
    147        self.assertEqual(4, len(warnings))
    148        for expected, actual in zip(expected_warnings, warnings):
    149            self.assertEqual(expected, actual)
    150 
    151    def test_do_extra_handshake(self):
    152        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    153        request = mock.MockRequest()
    154        request.ws_resource = '/origin_check'
    155        request.ws_origin = 'http://example.com'
    156        dispatcher.do_extra_handshake(request)  # Must not raise exception.
    157 
    158        request.ws_origin = 'http://bad.example.com'
    159        try:
    160            dispatcher.do_extra_handshake(request)
    161            self.fail('Could not catch HandshakeException with 403 status')
    162        except handshake.HandshakeException as e:
    163            self.assertEqual(403, e.status)
    164        except Exception as e:
    165            self.fail('Unexpected exception: %r' % e)
    166 
    167    def test_abort_extra_handshake(self):
    168        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    169        request = mock.MockRequest()
    170        request.ws_resource = '/abort_by_user'
    171        self.assertRaises(handshake.AbortedByUserException,
    172                          dispatcher.do_extra_handshake, request)
    173 
    174    def test_transfer_data(self):
    175        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    176 
    177        request = mock.MockRequest(
    178            connection=mock.MockConn(b'\x88\x02\x03\xe8'))
    179        request.ws_resource = '/origin_check'
    180        request.ws_protocol = 'p1'
    181        dispatcher.transfer_data(request)
    182        self.assertEqual(
    183            b'origin_check_wsh.py is called for /origin_check, p1'
    184            b'\x88\x02\x03\xe8', request.connection.written_data())
    185 
    186        request = mock.MockRequest(
    187            connection=mock.MockConn(b'\x88\x02\x03\xe8'))
    188        request.ws_resource = '/sub/plain'
    189        request.ws_protocol = None
    190        dispatcher.transfer_data(request)
    191        self.assertEqual(
    192            b'sub/plain_wsh.py is called for /sub/plain, None'
    193            b'\x88\x02\x03\xe8', request.connection.written_data())
    194 
    195        request = mock.MockRequest(
    196            connection=mock.MockConn(b'\x88\x02\x03\xe8'))
    197        request.ws_resource = '/sub/plain?'
    198        request.ws_protocol = None
    199        dispatcher.transfer_data(request)
    200        self.assertEqual(
    201            b'sub/plain_wsh.py is called for /sub/plain?, None'
    202            b'\x88\x02\x03\xe8', request.connection.written_data())
    203 
    204        request = mock.MockRequest(
    205            connection=mock.MockConn(b'\x88\x02\x03\xe8'))
    206        request.ws_resource = '/sub/plain?q=v'
    207        request.ws_protocol = None
    208        dispatcher.transfer_data(request)
    209        self.assertEqual(
    210            b'sub/plain_wsh.py is called for /sub/plain?q=v, None'
    211            b'\x88\x02\x03\xe8', request.connection.written_data())
    212 
    213    def test_transfer_data_no_handler(self):
    214        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    215        for resource in [
    216                '/blank', '/sub/non_callable', '/sub/no_wsh_at_the_end',
    217                '/does/not/exist'
    218        ]:
    219            request = mock.MockRequest(connection=mock.MockConn(b''))
    220            request.ws_resource = resource
    221            request.ws_protocol = 'p2'
    222            try:
    223                dispatcher.transfer_data(request)
    224                self.fail()
    225            except dispatch.DispatchException as e:
    226                self.assertTrue(str(e).find('No handler') != -1)
    227            except Exception:
    228                self.fail()
    229 
    230    def test_transfer_data_handler_exception(self):
    231        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    232        request = mock.MockRequest(connection=mock.MockConn(b''))
    233        request.ws_resource = '/sub/exception_in_transfer'
    234        request.ws_protocol = 'p3'
    235        try:
    236            dispatcher.transfer_data(request)
    237            self.fail()
    238        except Exception as e:
    239            self.assertTrue(
    240                str(e).find('Intentional') != -1,
    241                'Unexpected exception: %s' % e)
    242 
    243    def test_abort_transfer_data(self):
    244        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    245        request = mock.MockRequest()
    246        request.ws_resource = '/abort_by_user'
    247        self.assertRaises(handshake.AbortedByUserException,
    248                          dispatcher.transfer_data, request)
    249 
    250    def test_scan_dir(self):
    251        disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    252        self.assertEqual(4, len(disp._handler_suite_map))
    253        self.assertTrue('/origin_check' in disp._handler_suite_map)
    254        self.assertTrue(
    255            '/sub/exception_in_transfer' in disp._handler_suite_map)
    256        self.assertTrue('/sub/plain' in disp._handler_suite_map)
    257 
    258    def test_scan_sub_dir(self):
    259        disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR)
    260        self.assertEqual(2, len(disp._handler_suite_map))
    261        self.assertFalse('/origin_check' in disp._handler_suite_map)
    262        self.assertTrue(
    263            '/sub/exception_in_transfer' in disp._handler_suite_map)
    264        self.assertTrue('/sub/plain' in disp._handler_suite_map)
    265 
    266    def test_scan_sub_dir_as_root(self):
    267        disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR,
    268                                   _TEST_HANDLERS_SUB_DIR)
    269        self.assertEqual(2, len(disp._handler_suite_map))
    270        self.assertFalse('/origin_check' in disp._handler_suite_map)
    271        self.assertFalse(
    272            '/sub/exception_in_transfer' in disp._handler_suite_map)
    273        self.assertFalse('/sub/plain' in disp._handler_suite_map)
    274        self.assertTrue('/exception_in_transfer' in disp._handler_suite_map)
    275        self.assertTrue('/plain' in disp._handler_suite_map)
    276 
    277    def test_scan_dir_must_under_root(self):
    278        dispatch.Dispatcher('a/b', 'a/b/c')  # OK
    279        dispatch.Dispatcher('a/b///', 'a/b')  # OK
    280        self.assertRaises(dispatch.DispatchException, dispatch.Dispatcher,
    281                          'a/b/c', 'a/b')
    282 
    283    def test_resource_path_alias(self):
    284        disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    285        disp.add_resource_path_alias('/', '/origin_check')
    286        self.assertEqual(5, len(disp._handler_suite_map))
    287        self.assertTrue('/origin_check' in disp._handler_suite_map)
    288        self.assertTrue(
    289            '/sub/exception_in_transfer' in disp._handler_suite_map)
    290        self.assertTrue('/sub/plain' in disp._handler_suite_map)
    291        self.assertTrue('/' in disp._handler_suite_map)
    292        self.assertRaises(dispatch.DispatchException,
    293                          disp.add_resource_path_alias, '/alias', '/not-exist')
    294 
    295 
    296 if __name__ == '__main__':
    297    unittest.main()
    298 
    299 # vi:sts=4 sw=4 et