test_dispatch.py (13266B)
1 #!/usr/bin/env python 2 # 3 # Copyright 2011, Google Inc. 4 # All rights reserved. 5 # 6 # Redistribution and use in source and binary forms, with or without 7 # modification, are permitted provided that the following conditions are 8 # met: 9 # 10 # * Redistributions of source code must retain the above copyright 11 # notice, this list of conditions and the following disclaimer. 12 # * Redistributions in binary form must reproduce the above 13 # copyright notice, this list of conditions and the following disclaimer 14 # in the documentation and/or other materials provided with the 15 # distribution. 16 # * Neither the name of Google Inc. nor the names of its 17 # contributors may be used to endorse or promote products derived from 18 # this software without specific prior written permission. 19 # 20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 """Tests for dispatch module.""" 32 33 from __future__ import absolute_import 34 35 import os 36 import unittest 37 38 from six.moves import zip 39 40 import set_sys_path # Update sys.path to locate pywebsocket3 module. 41 from pywebsocket3 import dispatch, handshake 42 from test import mock 43 44 45 _TEST_HANDLERS_DIR = os.path.join(os.path.dirname(__file__), 'testdata', 46 'handlers') 47 48 _TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub') 49 50 51 class DispatcherTest(unittest.TestCase): 52 """A unittest for dispatch module.""" 53 def test_normalize_path(self): 54 self.assertEqual( 55 os.path.abspath('/a/b').replace('\\', '/'), 56 dispatch._normalize_path('/a/b')) 57 self.assertEqual( 58 os.path.abspath('/a/b').replace('\\', '/'), 59 dispatch._normalize_path('\\a\\b')) 60 self.assertEqual( 61 os.path.abspath('/a/b').replace('\\', '/'), 62 dispatch._normalize_path('/a/c/../b')) 63 self.assertEqual( 64 os.path.abspath('abc').replace('\\', '/'), 65 dispatch._normalize_path('abc')) 66 67 def test_converter(self): 68 converter = dispatch._create_path_to_resource_converter('/a/b') 69 # Python built by MSC inserts a drive name like 'C:\' via realpath(). 70 # Converter Generator expands provided path using realpath() and uses 71 # the path including a drive name to verify the prefix. 72 os_root = os.path.realpath('/') 73 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py')) 74 self.assertEqual('/c/h', converter(os_root + 'a/b/c/h_wsh.py')) 75 self.assertEqual(None, converter(os_root + 'a/b/h.py')) 76 self.assertEqual(None, converter('a/b/h_wsh.py')) 77 78 converter = dispatch._create_path_to_resource_converter('a/b') 79 self.assertEqual('/h', 80 converter(dispatch._normalize_path('a/b/h_wsh.py'))) 81 82 converter = dispatch._create_path_to_resource_converter('/a/b///') 83 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py')) 84 self.assertEqual( 85 '/h', converter(dispatch._normalize_path('/a/b/../b/h_wsh.py'))) 86 87 converter = dispatch._create_path_to_resource_converter( 88 '/a/../a/b/../b/') 89 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py')) 90 91 converter = dispatch._create_path_to_resource_converter(r'\a\b') 92 self.assertEqual('/h', converter(os_root + r'a\b\h_wsh.py')) 93 self.assertEqual('/h', converter(os_root + r'a/b/h_wsh.py')) 94 95 def test_enumerate_handler_file_paths(self): 96 paths = list( 97 dispatch._enumerate_handler_file_paths(_TEST_HANDLERS_DIR)) 98 paths.sort() 99 self.assertEqual(8, len(paths)) 100 expected_paths = [ 101 os.path.join(_TEST_HANDLERS_DIR, 'abort_by_user_wsh.py'), 102 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'), 103 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'), 104 os.path.join(_TEST_HANDLERS_DIR, 'sub', 105 'exception_in_transfer_wsh.py'), 106 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'), 107 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'), 108 os.path.join(_TEST_HANDLERS_DIR, 'sub', 109 'wrong_handshake_sig_wsh.py'), 110 os.path.join(_TEST_HANDLERS_DIR, 'sub', 111 'wrong_transfer_sig_wsh.py'), 112 ] 113 for expected, actual in zip(expected_paths, paths): 114 self.assertEqual(expected, actual) 115 116 def test_source_handler_file(self): 117 self.assertRaises(dispatch.DispatchException, 118 dispatch._source_handler_file, '') 119 self.assertRaises(dispatch.DispatchException, 120 dispatch._source_handler_file, 'def') 121 self.assertRaises(dispatch.DispatchException, 122 dispatch._source_handler_file, '1/0') 123 self.assertTrue( 124 dispatch._source_handler_file( 125 'def web_socket_do_extra_handshake(request):pass\n' 126 'def web_socket_transfer_data(request):pass\n')) 127 128 def test_source_warnings(self): 129 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 130 warnings = dispatcher.source_warnings() 131 warnings.sort() 132 expected_warnings = [ 133 (os.path.realpath(os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py')) 134 + ': web_socket_do_extra_handshake is not defined.'), 135 (os.path.realpath( 136 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py')) 137 + ': web_socket_do_extra_handshake is not callable.'), 138 (os.path.realpath( 139 os.path.join(_TEST_HANDLERS_DIR, 'sub', 140 'wrong_handshake_sig_wsh.py')) + 141 ': web_socket_do_extra_handshake is not defined.'), 142 (os.path.realpath( 143 os.path.join(_TEST_HANDLERS_DIR, 'sub', 144 'wrong_transfer_sig_wsh.py')) + 145 ': web_socket_transfer_data is not defined.'), 146 ] 147 self.assertEqual(4, len(warnings)) 148 for expected, actual in zip(expected_warnings, warnings): 149 self.assertEqual(expected, actual) 150 151 def test_do_extra_handshake(self): 152 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 153 request = mock.MockRequest() 154 request.ws_resource = '/origin_check' 155 request.ws_origin = 'http://example.com' 156 dispatcher.do_extra_handshake(request) # Must not raise exception. 157 158 request.ws_origin = 'http://bad.example.com' 159 try: 160 dispatcher.do_extra_handshake(request) 161 self.fail('Could not catch HandshakeException with 403 status') 162 except handshake.HandshakeException as e: 163 self.assertEqual(403, e.status) 164 except Exception as e: 165 self.fail('Unexpected exception: %r' % e) 166 167 def test_abort_extra_handshake(self): 168 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 169 request = mock.MockRequest() 170 request.ws_resource = '/abort_by_user' 171 self.assertRaises(handshake.AbortedByUserException, 172 dispatcher.do_extra_handshake, request) 173 174 def test_transfer_data(self): 175 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 176 177 request = mock.MockRequest( 178 connection=mock.MockConn(b'\x88\x02\x03\xe8')) 179 request.ws_resource = '/origin_check' 180 request.ws_protocol = 'p1' 181 dispatcher.transfer_data(request) 182 self.assertEqual( 183 b'origin_check_wsh.py is called for /origin_check, p1' 184 b'\x88\x02\x03\xe8', request.connection.written_data()) 185 186 request = mock.MockRequest( 187 connection=mock.MockConn(b'\x88\x02\x03\xe8')) 188 request.ws_resource = '/sub/plain' 189 request.ws_protocol = None 190 dispatcher.transfer_data(request) 191 self.assertEqual( 192 b'sub/plain_wsh.py is called for /sub/plain, None' 193 b'\x88\x02\x03\xe8', request.connection.written_data()) 194 195 request = mock.MockRequest( 196 connection=mock.MockConn(b'\x88\x02\x03\xe8')) 197 request.ws_resource = '/sub/plain?' 198 request.ws_protocol = None 199 dispatcher.transfer_data(request) 200 self.assertEqual( 201 b'sub/plain_wsh.py is called for /sub/plain?, None' 202 b'\x88\x02\x03\xe8', request.connection.written_data()) 203 204 request = mock.MockRequest( 205 connection=mock.MockConn(b'\x88\x02\x03\xe8')) 206 request.ws_resource = '/sub/plain?q=v' 207 request.ws_protocol = None 208 dispatcher.transfer_data(request) 209 self.assertEqual( 210 b'sub/plain_wsh.py is called for /sub/plain?q=v, None' 211 b'\x88\x02\x03\xe8', request.connection.written_data()) 212 213 def test_transfer_data_no_handler(self): 214 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 215 for resource in [ 216 '/blank', '/sub/non_callable', '/sub/no_wsh_at_the_end', 217 '/does/not/exist' 218 ]: 219 request = mock.MockRequest(connection=mock.MockConn(b'')) 220 request.ws_resource = resource 221 request.ws_protocol = 'p2' 222 try: 223 dispatcher.transfer_data(request) 224 self.fail() 225 except dispatch.DispatchException as e: 226 self.assertTrue(str(e).find('No handler') != -1) 227 except Exception: 228 self.fail() 229 230 def test_transfer_data_handler_exception(self): 231 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 232 request = mock.MockRequest(connection=mock.MockConn(b'')) 233 request.ws_resource = '/sub/exception_in_transfer' 234 request.ws_protocol = 'p3' 235 try: 236 dispatcher.transfer_data(request) 237 self.fail() 238 except Exception as e: 239 self.assertTrue( 240 str(e).find('Intentional') != -1, 241 'Unexpected exception: %s' % e) 242 243 def test_abort_transfer_data(self): 244 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 245 request = mock.MockRequest() 246 request.ws_resource = '/abort_by_user' 247 self.assertRaises(handshake.AbortedByUserException, 248 dispatcher.transfer_data, request) 249 250 def test_scan_dir(self): 251 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 252 self.assertEqual(4, len(disp._handler_suite_map)) 253 self.assertTrue('/origin_check' in disp._handler_suite_map) 254 self.assertTrue( 255 '/sub/exception_in_transfer' in disp._handler_suite_map) 256 self.assertTrue('/sub/plain' in disp._handler_suite_map) 257 258 def test_scan_sub_dir(self): 259 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR) 260 self.assertEqual(2, len(disp._handler_suite_map)) 261 self.assertFalse('/origin_check' in disp._handler_suite_map) 262 self.assertTrue( 263 '/sub/exception_in_transfer' in disp._handler_suite_map) 264 self.assertTrue('/sub/plain' in disp._handler_suite_map) 265 266 def test_scan_sub_dir_as_root(self): 267 disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR, 268 _TEST_HANDLERS_SUB_DIR) 269 self.assertEqual(2, len(disp._handler_suite_map)) 270 self.assertFalse('/origin_check' in disp._handler_suite_map) 271 self.assertFalse( 272 '/sub/exception_in_transfer' in disp._handler_suite_map) 273 self.assertFalse('/sub/plain' in disp._handler_suite_map) 274 self.assertTrue('/exception_in_transfer' in disp._handler_suite_map) 275 self.assertTrue('/plain' in disp._handler_suite_map) 276 277 def test_scan_dir_must_under_root(self): 278 dispatch.Dispatcher('a/b', 'a/b/c') # OK 279 dispatch.Dispatcher('a/b///', 'a/b') # OK 280 self.assertRaises(dispatch.DispatchException, dispatch.Dispatcher, 281 'a/b/c', 'a/b') 282 283 def test_resource_path_alias(self): 284 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 285 disp.add_resource_path_alias('/', '/origin_check') 286 self.assertEqual(5, len(disp._handler_suite_map)) 287 self.assertTrue('/origin_check' in disp._handler_suite_map) 288 self.assertTrue( 289 '/sub/exception_in_transfer' in disp._handler_suite_map) 290 self.assertTrue('/sub/plain' in disp._handler_suite_map) 291 self.assertTrue('/' in disp._handler_suite_map) 292 self.assertRaises(dispatch.DispatchException, 293 disp.add_resource_path_alias, '/alias', '/not-exist') 294 295 296 if __name__ == '__main__': 297 unittest.main() 298 299 # vi:sts=4 sw=4 et