test_util.py (7290B)
1 #!/usr/bin/env python 2 # 3 # Copyright 2011, Google Inc. 4 # All rights reserved. 5 # 6 # Redistribution and use in source and binary forms, with or without 7 # modification, are permitted provided that the following conditions are 8 # met: 9 # 10 # * Redistributions of source code must retain the above copyright 11 # notice, this list of conditions and the following disclaimer. 12 # * Redistributions in binary form must reproduce the above 13 # copyright notice, this list of conditions and the following disclaimer 14 # in the documentation and/or other materials provided with the 15 # distribution. 16 # * Neither the name of Google Inc. nor the names of its 17 # contributors may be used to endorse or promote products derived from 18 # this software without specific prior written permission. 19 # 20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 """Tests for util module.""" 32 33 from __future__ import absolute_import 34 from __future__ import print_function 35 36 import os 37 import random 38 import unittest 39 40 from six import int2byte, PY3 41 from six.moves import range 42 43 import set_sys_path # Update sys.path to locate pywebsocket3 module. 44 from pywebsocket3 import util 45 46 _TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'testdata') 47 48 49 class UtilTest(unittest.TestCase): 50 """A unittest for util module.""" 51 def test_prepend_message_to_exception(self): 52 exc = Exception('World') 53 self.assertEqual('World', str(exc)) 54 util.prepend_message_to_exception('Hello ', exc) 55 self.assertEqual('Hello World', str(exc)) 56 57 def test_get_script_interp(self): 58 cygwin_path = 'c:\\cygwin\\bin' 59 cygwin_perl = os.path.join(cygwin_path, 'perl') 60 self.assertEqual( 61 None, util.get_script_interp(os.path.join(_TEST_DATA_DIR, 62 'README'))) 63 self.assertEqual( 64 None, 65 util.get_script_interp(os.path.join(_TEST_DATA_DIR, 'README'), 66 cygwin_path)) 67 self.assertEqual( 68 '/usr/bin/perl -wT', 69 util.get_script_interp(os.path.join(_TEST_DATA_DIR, 'hello.pl'))) 70 self.assertEqual( 71 cygwin_perl + ' -wT', 72 util.get_script_interp(os.path.join(_TEST_DATA_DIR, 'hello.pl'), 73 cygwin_path)) 74 75 def test_hexify(self): 76 self.assertEqual('61 7a 41 5a 30 39 20 09 0d 0a 00 ff', 77 util.hexify(b'azAZ09 \t\r\n\x00\xff')) 78 79 80 class RepeatedXorMaskerTest(unittest.TestCase): 81 """A unittest for RepeatedXorMasker class.""" 82 def test_mask(self): 83 # Sample input e6,97,a5 is U+65e5 in UTF-8 84 masker = util.RepeatedXorMasker(b'\xff\xff\xff\xff') 85 result = masker.mask(b'\xe6\x97\xa5') 86 self.assertEqual(b'\x19\x68\x5a', result) 87 88 masker = util.RepeatedXorMasker(b'\x00\x00\x00\x00') 89 result = masker.mask(b'\xe6\x97\xa5') 90 self.assertEqual(b'\xe6\x97\xa5', result) 91 92 masker = util.RepeatedXorMasker(b'\xe6\x97\xa5\x20') 93 result = masker.mask(b'\xe6\x97\xa5') 94 self.assertEqual(b'\x00\x00\x00', result) 95 96 def test_mask_twice(self): 97 masker = util.RepeatedXorMasker(b'\x00\x7f\xff\x20') 98 # mask[0], mask[1], ... will be used. 99 result = masker.mask(b'\x00\x00\x00\x00\x00') 100 self.assertEqual(b'\x00\x7f\xff\x20\x00', result) 101 # mask[2], mask[0], ... will be used for the next call. 102 result = masker.mask(b'\x00\x00\x00\x00\x00') 103 self.assertEqual(b'\x7f\xff\x20\x00\x7f', result) 104 105 def test_mask_large_data(self): 106 masker = util.RepeatedXorMasker(b'mASk') 107 original = b''.join([util.pack_byte(i % 256) for i in range(1000)]) 108 result = masker.mask(original) 109 expected = b''.join([ 110 util.pack_byte((i % 256) ^ ord('mASk'[i % 4])) for i in range(1000) 111 ]) 112 self.assertEqual(expected, result) 113 114 masker = util.RepeatedXorMasker(b'MaSk') 115 first_part = b'The WebSocket Protocol enables two-way communication.' 116 result = masker.mask(first_part) 117 self.assertEqual( 118 b'\x19\t6K\x1a\x0418"\x028\x0e9A\x03\x19"\x15<\x08"\rs\x0e#' 119 b'\x001\x07(\x12s\x1f:\x0e~\x1c,\x18s\x08"\x0c>\x1e#\x080\n9' 120 b'\x08<\x05c', result) 121 second_part = b'It has two parts: a handshake and the data transfer.' 122 result = masker.mask(second_part) 123 self.assertEqual( 124 b"('K%\x00 K9\x16<K=\x00!\x1f>[s\nm\t2\x05)\x12;\n&\x04s\n#" 125 b"\x05s\x1f%\x04s\x0f,\x152K9\x132\x05>\x076\x19c", result) 126 127 128 def get_random_section(source, min_num_chunks): 129 chunks = [] 130 bytes_chunked = 0 131 132 while bytes_chunked < len(source): 133 chunk_size = random.randint( 134 1, min(len(source) / min_num_chunks, 135 len(source) - bytes_chunked)) 136 chunk = source[bytes_chunked:bytes_chunked + chunk_size] 137 chunks.append(chunk) 138 bytes_chunked += chunk_size 139 140 return chunks 141 142 143 class InflaterDeflaterTest(unittest.TestCase): 144 """A unittest for _Inflater and _Deflater class.""" 145 def test_inflate_deflate_default(self): 146 input = b'hello' + b'-' * 30000 + b'hello' 147 inflater15 = util._Inflater(15) 148 deflater15 = util._Deflater(15) 149 inflater8 = util._Inflater(8) 150 deflater8 = util._Deflater(8) 151 152 compressed15 = deflater15.compress_and_finish(input) 153 compressed8 = deflater8.compress_and_finish(input) 154 155 inflater15.append(compressed15) 156 inflater8.append(compressed8) 157 158 self.assertNotEqual(compressed15, compressed8) 159 self.assertEqual(input, inflater15.decompress(-1)) 160 self.assertEqual(input, inflater8.decompress(-1)) 161 162 def test_random_section(self): 163 random.seed(a=0) 164 source = b''.join( 165 [int2byte(random.randint(0, 255)) for i in range(100 * 1024)]) 166 167 chunked_input = get_random_section(source, 10) 168 169 deflater = util._Deflater(15) 170 compressed = [] 171 for chunk in chunked_input: 172 compressed.append(deflater.compress(chunk)) 173 compressed.append(deflater.compress_and_finish(b'')) 174 175 chunked_expectation = get_random_section(source, 10) 176 177 inflater = util._Inflater(15) 178 inflater.append(b''.join(compressed)) 179 for chunk in chunked_expectation: 180 decompressed = inflater.decompress(len(chunk)) 181 self.assertEqual(chunk, decompressed) 182 183 self.assertEqual(b'', inflater.decompress(-1)) 184 185 186 if __name__ == '__main__': 187 unittest.main() 188 189 # vi:sts=4 sw=4 et