test_hpack.py (28644B)
1 # -*- coding: utf-8 -*- 2 import itertools 3 import pytest 4 5 from hypothesis import given 6 from hypothesis.strategies import text, binary, sets, one_of 7 8 from hpack import ( 9 Encoder, 10 Decoder, 11 HeaderTuple, 12 NeverIndexedHeaderTuple, 13 HPACKDecodingError, 14 InvalidTableIndex, 15 OversizedHeaderListError, 16 InvalidTableSizeError, 17 ) 18 from hpack.hpack import _dict_to_iterable, _to_bytes 19 20 21 class TestHPACKEncoder: 22 # These tests are stolen entirely from the IETF specification examples. 23 def test_literal_header_field_with_indexing(self): 24 """ 25 The header field representation uses a literal name and a literal 26 value. 27 """ 28 e = Encoder() 29 header_set = {'custom-key': 'custom-header'} 30 result = b'\x40\x0acustom-key\x0dcustom-header' 31 32 assert e.encode(header_set, huffman=False) == result 33 assert list(e.header_table.dynamic_entries) == [ 34 (n.encode('utf-8'), v.encode('utf-8')) 35 for n, v in header_set.items() 36 ] 37 38 def test_sensitive_headers(self): 39 """ 40 Test encoding header values 41 """ 42 e = Encoder() 43 result = (b'\x82\x14\x88\x63\xa1\xa9' + 44 b'\x32\x08\x73\xd0\xc7\x10' + 45 b'\x87\x25\xa8\x49\xe9\xea' + 46 b'\x5f\x5f\x89\x41\x6a\x41' + 47 b'\x92\x6e\xe5\x35\x52\x9f') 48 header_set = [ 49 (':method', 'GET', True), 50 (':path', '/jimiscool/', True), 51 ('customkey', 'sensitiveinfo', True), 52 ] 53 assert e.encode(header_set, huffman=True) == result 54 55 def test_non_sensitive_headers_with_header_tuples(self): 56 """ 57 A header field stored in a HeaderTuple emits a representation that 58 allows indexing. 59 """ 60 e = Encoder() 61 result = (b'\x82\x44\x88\x63\xa1\xa9' + 62 b'\x32\x08\x73\xd0\xc7\x40' + 63 b'\x87\x25\xa8\x49\xe9\xea' + 64 b'\x5f\x5f\x89\x41\x6a\x41' + 65 b'\x92\x6e\xe5\x35\x52\x9f') 66 header_set = [ 67 HeaderTuple(':method', 'GET'), 68 HeaderTuple(':path', '/jimiscool/'), 69 HeaderTuple('customkey', 'sensitiveinfo'), 70 ] 71 assert e.encode(header_set, huffman=True) == result 72 73 def test_sensitive_headers_with_header_tuples(self): 74 """ 75 A header field stored in a NeverIndexedHeaderTuple emits a 76 representation that forbids indexing. 77 """ 78 e = Encoder() 79 result = (b'\x82\x14\x88\x63\xa1\xa9' + 80 b'\x32\x08\x73\xd0\xc7\x10' + 81 b'\x87\x25\xa8\x49\xe9\xea' + 82 b'\x5f\x5f\x89\x41\x6a\x41' + 83 b'\x92\x6e\xe5\x35\x52\x9f') 84 header_set = [ 85 NeverIndexedHeaderTuple(':method', 'GET'), 86 NeverIndexedHeaderTuple(':path', '/jimiscool/'), 87 NeverIndexedHeaderTuple('customkey', 'sensitiveinfo'), 88 ] 89 assert e.encode(header_set, huffman=True) == result 90 91 def test_header_table_size_getter(self): 92 e = Encoder() 93 assert e.header_table_size == 4096 94 95 def test_indexed_literal_header_field_with_indexing(self): 96 """ 97 The header field representation uses an indexed name and a literal 98 value and performs incremental indexing. 99 """ 100 e = Encoder() 101 header_set = {':path': '/sample/path'} 102 result = b'\x44\x0c/sample/path' 103 104 assert e.encode(header_set, huffman=False) == result 105 assert list(e.header_table.dynamic_entries) == [ 106 (n.encode('utf-8'), v.encode('utf-8')) 107 for n, v in header_set.items() 108 ] 109 110 def test_indexed_header_field(self): 111 """ 112 The header field representation uses an indexed header field, from 113 the static table. 114 """ 115 e = Encoder() 116 header_set = {':method': 'GET'} 117 result = b'\x82' 118 119 assert e.encode(header_set, huffman=False) == result 120 assert list(e.header_table.dynamic_entries) == [] 121 122 def test_indexed_header_field_from_static_table(self): 123 e = Encoder() 124 e.header_table_size = 0 125 header_set = {':method': 'GET'} 126 result = b'\x82' 127 128 # Make sure we don't emit an encoding context update. 129 e.header_table.resized = False 130 131 assert e.encode(header_set, huffman=False) == result 132 assert list(e.header_table.dynamic_entries) == [] 133 134 def test_request_examples_without_huffman(self): 135 """ 136 This section shows several consecutive header sets, corresponding to 137 HTTP requests, on the same connection. 138 """ 139 e = Encoder() 140 first_header_set = [ 141 (':method', 'GET',), 142 (':scheme', 'http',), 143 (':path', '/',), 144 (':authority', 'www.example.com'), 145 ] 146 # We should have :authority in first_header_table since we index it 147 first_header_table = [(':authority', 'www.example.com')] 148 first_result = b'\x82\x86\x84\x41\x0fwww.example.com' 149 150 assert e.encode(first_header_set, huffman=False) == first_result 151 assert list(e.header_table.dynamic_entries) == [ 152 (n.encode('utf-8'), v.encode('utf-8')) 153 for n, v in first_header_table 154 ] 155 156 second_header_set = [ 157 (':method', 'GET',), 158 (':scheme', 'http',), 159 (':path', '/',), 160 (':authority', 'www.example.com',), 161 ('cache-control', 'no-cache'), 162 ] 163 second_header_table = [ 164 ('cache-control', 'no-cache'), 165 (':authority', 'www.example.com') 166 ] 167 second_result = b'\x82\x86\x84\xbeX\x08no-cache' 168 169 assert e.encode(second_header_set, huffman=False) == second_result 170 assert list(e.header_table.dynamic_entries) == [ 171 (n.encode('utf-8'), v.encode('utf-8')) 172 for n, v in second_header_table 173 ] 174 175 third_header_set = [ 176 (':method', 'GET',), 177 (':scheme', 'https',), 178 (':path', '/index.html',), 179 (':authority', 'www.example.com',), 180 ('custom-key', 'custom-value'), 181 ] 182 third_result = ( 183 b'\x82\x87\x85\xbf@\ncustom-key\x0ccustom-value' 184 ) 185 186 assert e.encode(third_header_set, huffman=False) == third_result 187 # Don't check the header table here, it's just too complex to be 188 # reliable. Check its length though. 189 assert len(e.header_table.dynamic_entries) == 3 190 191 def test_request_examples_with_huffman(self): 192 """ 193 This section shows the same examples as the previous section, but 194 using Huffman encoding for the literal values. 195 """ 196 e = Encoder() 197 first_header_set = [ 198 (':method', 'GET',), 199 (':scheme', 'http',), 200 (':path', '/',), 201 (':authority', 'www.example.com'), 202 ] 203 first_header_table = [(':authority', 'www.example.com')] 204 first_result = ( 205 b'\x82\x86\x84\x41\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff' 206 ) 207 208 assert e.encode(first_header_set, huffman=True) == first_result 209 assert list(e.header_table.dynamic_entries) == [ 210 (n.encode('utf-8'), v.encode('utf-8')) 211 for n, v in first_header_table 212 ] 213 214 second_header_table = [ 215 ('cache-control', 'no-cache'), 216 (':authority', 'www.example.com') 217 ] 218 second_header_set = [ 219 (':method', 'GET',), 220 (':scheme', 'http',), 221 (':path', '/',), 222 (':authority', 'www.example.com',), 223 ('cache-control', 'no-cache'), 224 ] 225 second_result = b'\x82\x86\x84\xbeX\x86\xa8\xeb\x10d\x9c\xbf' 226 227 assert e.encode(second_header_set, huffman=True) == second_result 228 assert list(e.header_table.dynamic_entries) == [ 229 (n.encode('utf-8'), v.encode('utf-8')) 230 for n, v in second_header_table 231 ] 232 233 third_header_set = [ 234 (':method', 'GET',), 235 (':scheme', 'https',), 236 (':path', '/index.html',), 237 (':authority', 'www.example.com',), 238 ('custom-key', 'custom-value'), 239 ] 240 third_result = ( 241 b'\x82\x87\x85\xbf' 242 b'@\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8\xb4\xbf' 243 ) 244 245 assert e.encode(third_header_set, huffman=True) == third_result 246 assert len(e.header_table.dynamic_entries) == 3 247 248 # These tests are custom, for hyper. 249 def test_resizing_header_table(self): 250 # We need to encode a substantial number of headers, to populate the 251 # header table. 252 e = Encoder() 253 header_set = [ 254 (':method', 'GET'), 255 (':scheme', 'https'), 256 (':path', '/some/path'), 257 (':authority', 'www.example.com'), 258 ('custom-key', 'custom-value'), 259 ( 260 "user-agent", 261 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:16.0) " 262 "Gecko/20100101 Firefox/16.0", 263 ), 264 ( 265 "accept", 266 "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;" 267 "q=0.8", 268 ), 269 ('X-Lukasa-Test', '88989'), 270 ] 271 e.encode(header_set, huffman=True) 272 273 # Resize the header table to a size so small that nothing can be in it. 274 e.header_table_size = 40 275 assert len(e.header_table.dynamic_entries) == 0 276 277 def test_resizing_header_table_sends_multiple_updates(self): 278 e = Encoder() 279 280 e.header_table_size = 40 281 e.header_table_size = 100 282 e.header_table_size = 40 283 284 header_set = [(':method', 'GET')] 285 out = e.encode(header_set, huffman=True) 286 assert out == b'\x3F\x09\x3F\x45\x3F\x09\x82' 287 288 def test_resizing_header_table_to_same_size_ignored(self): 289 e = Encoder() 290 291 # These size changes should be ignored 292 e.header_table_size = 4096 293 e.header_table_size = 4096 294 e.header_table_size = 4096 295 296 # These size changes should be encoded 297 e.header_table_size = 40 298 e.header_table_size = 100 299 e.header_table_size = 40 300 301 header_set = [(':method', 'GET')] 302 out = e.encode(header_set, huffman=True) 303 assert out == b'\x3F\x09\x3F\x45\x3F\x09\x82' 304 305 def test_resizing_header_table_sends_context_update(self): 306 e = Encoder() 307 308 # Resize the header table to a size so small that nothing can be in it. 309 e.header_table_size = 40 310 311 # Now, encode a header set. Just a small one, with a well-defined 312 # output. 313 header_set = [(':method', 'GET')] 314 out = e.encode(header_set, huffman=True) 315 316 assert out == b'?\t\x82' 317 318 def test_setting_table_size_to_the_same_does_nothing(self): 319 e = Encoder() 320 321 # Set the header table size to the default. 322 e.header_table_size = 4096 323 324 # Now encode a header set. Just a small one, with a well-defined 325 # output. 326 header_set = [(':method', 'GET')] 327 out = e.encode(header_set, huffman=True) 328 329 assert out == b'\x82' 330 331 def test_evicting_header_table_objects(self): 332 e = Encoder() 333 334 # Set the header table size large enough to include one header. 335 e.header_table_size = 66 336 header_set = [('a', 'b'), ('long-custom-header', 'longish value')] 337 e.encode(header_set) 338 339 assert len(e.header_table.dynamic_entries) == 1 340 341 def test_headers_generator(self): 342 e = Encoder() 343 344 def headers_generator(): 345 return (("k" + str(i), "v" + str(i)) for i in range(3)) 346 347 header_set = headers_generator() 348 out = e.encode(header_set) 349 assert Decoder().decode(out) == list(headers_generator()) 350 351 352 class TestHPACKDecoder: 353 # These tests are stolen entirely from the IETF specification examples. 354 def test_literal_header_field_with_indexing(self): 355 """ 356 The header field representation uses a literal name and a literal 357 value. 358 """ 359 d = Decoder() 360 header_set = [('custom-key', 'custom-header')] 361 data = b'\x40\x0acustom-key\x0dcustom-header' 362 363 assert d.decode(data) == header_set 364 assert list(d.header_table.dynamic_entries) == [ 365 (n.encode('utf-8'), v.encode('utf-8')) for n, v in header_set 366 ] 367 368 def test_raw_decoding(self): 369 """ 370 The header field representation is decoded as a raw byte string instead 371 of UTF-8 372 """ 373 d = Decoder() 374 header_set = [ 375 (b'\x00\x01\x99\x30\x11\x22\x55\x21\x89\x14', b'custom-header') 376 ] 377 data = ( 378 b'\x40\x0a\x00\x01\x99\x30\x11\x22\x55\x21\x89\x14\x0d' 379 b'custom-header' 380 ) 381 382 assert d.decode(data, raw=True) == header_set 383 384 def test_literal_header_field_without_indexing(self): 385 """ 386 The header field representation uses an indexed name and a literal 387 value. 388 """ 389 d = Decoder() 390 header_set = [(':path', '/sample/path')] 391 data = b'\x04\x0c/sample/path' 392 393 assert d.decode(data) == header_set 394 assert list(d.header_table.dynamic_entries) == [] 395 396 def test_header_table_size_getter(self): 397 d = Decoder() 398 assert d.header_table_size 399 400 def test_indexed_header_field(self): 401 """ 402 The header field representation uses an indexed header field, from 403 the static table. 404 """ 405 d = Decoder() 406 header_set = [(':method', 'GET')] 407 data = b'\x82' 408 409 assert d.decode(data) == header_set 410 assert list(d.header_table.dynamic_entries) == [] 411 412 def test_request_examples_without_huffman(self): 413 """ 414 This section shows several consecutive header sets, corresponding to 415 HTTP requests, on the same connection. 416 """ 417 d = Decoder() 418 first_header_set = [ 419 (':method', 'GET',), 420 (':scheme', 'http',), 421 (':path', '/',), 422 (':authority', 'www.example.com'), 423 ] 424 # The first_header_table doesn't contain 'authority' 425 first_data = b'\x82\x86\x84\x01\x0fwww.example.com' 426 427 assert d.decode(first_data) == first_header_set 428 assert list(d.header_table.dynamic_entries) == [] 429 430 # This request takes advantage of the differential encoding of header 431 # sets. 432 second_header_set = [ 433 (':method', 'GET',), 434 (':scheme', 'http',), 435 (':path', '/',), 436 (':authority', 'www.example.com',), 437 ('cache-control', 'no-cache'), 438 ] 439 second_data = ( 440 b'\x82\x86\x84\x01\x0fwww.example.com\x0f\t\x08no-cache' 441 ) 442 443 assert d.decode(second_data) == second_header_set 444 assert list(d.header_table.dynamic_entries) == [] 445 446 third_header_set = [ 447 (':method', 'GET',), 448 (':scheme', 'https',), 449 (':path', '/index.html',), 450 (':authority', 'www.example.com',), 451 ('custom-key', 'custom-value'), 452 ] 453 third_data = ( 454 b'\x82\x87\x85\x01\x0fwww.example.com@\ncustom-key\x0ccustom-value' 455 ) 456 457 assert d.decode(third_data) == third_header_set 458 # Don't check the header table here, it's just too complex to be 459 # reliable. Check its length though. 460 assert len(d.header_table.dynamic_entries) == 1 461 462 def test_request_examples_with_huffman(self): 463 """ 464 This section shows the same examples as the previous section, but 465 using Huffman encoding for the literal values. 466 """ 467 d = Decoder() 468 469 first_header_set = [ 470 (':method', 'GET',), 471 (':scheme', 'http',), 472 (':path', '/',), 473 (':authority', 'www.example.com'), 474 ] 475 first_data = ( 476 b'\x82\x86\x84\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff' 477 ) 478 479 assert d.decode(first_data) == first_header_set 480 assert list(d.header_table.dynamic_entries) == [] 481 482 second_header_set = [ 483 (':method', 'GET',), 484 (':scheme', 'http',), 485 (':path', '/',), 486 (':authority', 'www.example.com',), 487 ('cache-control', 'no-cache'), 488 ] 489 second_data = ( 490 b'\x82\x86\x84\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff' 491 b'\x0f\t\x86\xa8\xeb\x10d\x9c\xbf' 492 ) 493 494 assert d.decode(second_data) == second_header_set 495 assert list(d.header_table.dynamic_entries) == [] 496 497 third_header_set = [ 498 (':method', 'GET',), 499 (':scheme', 'https',), 500 (':path', '/index.html',), 501 (':authority', 'www.example.com',), 502 ('custom-key', 'custom-value'), 503 ] 504 third_data = ( 505 b'\x82\x87\x85\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff@' 506 b'\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8\xb4\xbf' 507 ) 508 509 assert d.decode(third_data) == third_header_set 510 assert len(d.header_table.dynamic_entries) == 1 511 512 # These tests are custom, for hyper. 513 def test_resizing_header_table(self): 514 # We need to decode a substantial number of headers, to populate the 515 # header table. This string isn't magic: it's the output from the 516 # equivalent test for the Encoder. 517 d = Decoder() 518 data = ( 519 b'\x82\x87D\x87a\x07\xa4\xacV4\xcfA\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0' 520 b'\xab\x90\xf4\xff@\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8' 521 b'\xb4\xbfz\xbc\xd0\x7ff\xa2\x81\xb0\xda\xe0S\xfa\xd02\x1a\xa4\x9d' 522 b'\x13\xfd\xa9\x92\xa4\x96\x854\x0c\x8aj\xdc\xa7\xe2\x81\x02\xef}' 523 b'\xa9g{\x81qp\x7fjb):\x9d\x81\x00 \x00@\x150\x9a\xc2\xca\x7f,\x05' 524 b'\xc5\xc1S\xb0I|\xa5\x89\xd3M\x1fC\xae\xba\x0cA\xa4\xc7\xa9\x8f3' 525 b'\xa6\x9a?\xdf\x9ah\xfa\x1du\xd0b\r&=Ly\xa6\x8f\xbe\xd0\x01w\xfe' 526 b'\xbeX\xf9\xfb\xed\x00\x17{@\x8a\xfc[=\xbdF\x81\xad\xbc\xa8O\x84y' 527 b'\xe7\xde\x7f' 528 ) 529 d.decode(data) 530 531 # Resize the header table to a size so small that nothing can be in it. 532 d.header_table_size = 40 533 assert len(d.header_table.dynamic_entries) == 0 534 535 def test_apache_trafficserver(self): 536 # This test reproduces the bug in #110, using exactly the same header 537 # data. 538 d = Decoder() 539 data = ( 540 b'\x10\x07:status\x03200@\x06server\tATS/6.0.0' 541 b'@\x04date\x1dTue, 31 Mar 2015 08:09:51 GMT' 542 b'@\x0ccontent-type\ttext/html@\x0econtent-length\x0542468' 543 b'@\rlast-modified\x1dTue, 31 Mar 2015 01:55:51 GMT' 544 b'@\x04vary\x0fAccept-Encoding@\x04etag\x0f"5519fea7-a5e4"' 545 b'@\x08x-served\x05Nginx@\x14x-subdomain-tryfiles\x04True' 546 b'@\x07x-deity\thydra-lts@\raccept-ranges\x05bytes@\x03age\x010' 547 b'@\x19strict-transport-security\rmax-age=86400' 548 b'@\x03via2https/1.1 ATS (ApacheTrafficServer/6.0.0 [cSsNfU])' 549 ) 550 expect = [ 551 (':status', '200'), 552 ('server', 'ATS/6.0.0'), 553 ('date', 'Tue, 31 Mar 2015 08:09:51 GMT'), 554 ('content-type', 'text/html'), 555 ('content-length', '42468'), 556 ('last-modified', 'Tue, 31 Mar 2015 01:55:51 GMT'), 557 ('vary', 'Accept-Encoding'), 558 ('etag', '"5519fea7-a5e4"'), 559 ('x-served', 'Nginx'), 560 ('x-subdomain-tryfiles', 'True'), 561 ('x-deity', 'hydra-lts'), 562 ('accept-ranges', 'bytes'), 563 ('age', '0'), 564 ('strict-transport-security', 'max-age=86400'), 565 ('via', 'https/1.1 ATS (ApacheTrafficServer/6.0.0 [cSsNfU])'), 566 ] 567 568 result = d.decode(data) 569 570 assert result == expect 571 # The status header shouldn't be indexed. 572 assert len(d.header_table.dynamic_entries) == len(expect) - 1 573 574 def test_utf8_errors_raise_hpack_decoding_error(self): 575 d = Decoder() 576 577 # Invalid UTF-8 data. 578 data = b'\x82\x86\x84\x01\x10www.\x07\xaa\xd7\x95\xd7\xa8\xd7\x94.com' 579 580 with pytest.raises(HPACKDecodingError): 581 d.decode(data) 582 583 def test_invalid_indexed_literal(self): 584 d = Decoder() 585 586 # Refer to an index that is too large. 587 data = b'\x82\x86\x84\x7f\x0a\x0fwww.example.com' 588 with pytest.raises(InvalidTableIndex): 589 d.decode(data) 590 591 def test_invalid_indexed_header(self): 592 d = Decoder() 593 594 # Refer to an indexed header that is too large. 595 data = b'\xBE\x86\x84\x01\x0fwww.example.com' 596 with pytest.raises(InvalidTableIndex): 597 d.decode(data) 598 599 def test_literal_header_field_with_indexing_emits_headertuple(self): 600 """ 601 A header field with indexing emits a HeaderTuple. 602 """ 603 d = Decoder() 604 data = b'\x00\x0acustom-key\x0dcustom-header' 605 606 headers = d.decode(data) 607 assert len(headers) == 1 608 609 header = headers[0] 610 assert isinstance(header, HeaderTuple) 611 assert not isinstance(header, NeverIndexedHeaderTuple) 612 613 def test_literal_never_indexed_emits_neverindexedheadertuple(self): 614 """ 615 A literal header field that must never be indexed emits a 616 NeverIndexedHeaderTuple. 617 """ 618 d = Decoder() 619 data = b'\x10\x0acustom-key\x0dcustom-header' 620 621 headers = d.decode(data) 622 assert len(headers) == 1 623 624 header = headers[0] 625 assert isinstance(header, NeverIndexedHeaderTuple) 626 627 def test_indexed_never_indexed_emits_neverindexedheadertuple(self): 628 """ 629 A header field with an indexed name that must never be indexed emits a 630 NeverIndexedHeaderTuple. 631 """ 632 d = Decoder() 633 data = b'\x14\x0c/sample/path' 634 635 headers = d.decode(data) 636 assert len(headers) == 1 637 638 header = headers[0] 639 assert isinstance(header, NeverIndexedHeaderTuple) 640 641 def test_max_header_list_size(self): 642 """ 643 If the header block is larger than the max_header_list_size, the HPACK 644 decoder throws an OversizedHeaderListError. 645 """ 646 d = Decoder(max_header_list_size=44) 647 data = b'\x14\x0c/sample/path' 648 649 with pytest.raises(OversizedHeaderListError): 650 d.decode(data) 651 652 def test_can_decode_multiple_header_table_size_changes(self): 653 """ 654 If multiple header table size changes are sent in at once, they are 655 successfully decoded. 656 """ 657 d = Decoder() 658 data = b'?a?\xe1\x1f\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99' 659 expect = [ 660 (':method', 'GET'), 661 (':scheme', 'https'), 662 (':path', '/'), 663 (':authority', '127.0.0.1:8443') 664 ] 665 666 assert d.decode(data) == expect 667 668 def test_header_table_size_change_above_maximum(self): 669 """ 670 If a header table size change is received that exceeds the maximum 671 allowed table size, it is rejected. 672 """ 673 d = Decoder() 674 d.max_allowed_table_size = 127 675 data = b'?a\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99' 676 677 with pytest.raises(InvalidTableSizeError): 678 d.decode(data) 679 680 def test_table_size_not_adjusting(self): 681 """ 682 If the header table size is shrunk, and then the remote peer doesn't 683 join in the shrinking, then an error is raised. 684 """ 685 d = Decoder() 686 d.max_allowed_table_size = 128 687 data = b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99' 688 689 with pytest.raises(InvalidTableSizeError): 690 d.decode(data) 691 692 def test_table_size_last_rejected(self): 693 """ 694 If a header table size change comes last in the header block, it is 695 forbidden. 696 """ 697 d = Decoder() 698 data = b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99?a' 699 700 with pytest.raises(HPACKDecodingError): 701 d.decode(data) 702 703 def test_table_size_middle_rejected(self): 704 """ 705 If a header table size change comes anywhere but first in the header 706 block, it is forbidden. 707 """ 708 d = Decoder() 709 data = b'\x82?a\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99' 710 711 with pytest.raises(HPACKDecodingError): 712 d.decode(data) 713 714 def test_truncated_header_name(self): 715 """ 716 If a header name is truncated an error is raised. 717 """ 718 d = Decoder() 719 # This is a simple header block that has a bad ending. The interesting 720 # part begins on the second line. This indicates a string that has 721 # literal name and value. The name is a 5 character huffman-encoded 722 # string that is only three bytes long. 723 data = ( 724 b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99' 725 b'\x00\x85\xf2\xb2J' 726 ) 727 728 with pytest.raises(HPACKDecodingError): 729 d.decode(data) 730 731 def test_truncated_header_value(self): 732 """ 733 If a header value is truncated an error is raised. 734 """ 735 d = Decoder() 736 # This is a simple header block that has a bad ending. The interesting 737 # part begins on the second line. This indicates a string that has 738 # literal name and value. The name is a 5 character huffman-encoded 739 # string, but the entire EOS character has been written over the end. 740 # This causes hpack to see the header value as being supposed to be 741 # 622462 bytes long, which it clearly is not, and so this must fail. 742 data = ( 743 b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99' 744 b'\x00\x85\xf2\xb2J\x87\xff\xff\xff\xfd%B\x7f' 745 ) 746 747 with pytest.raises(HPACKDecodingError): 748 d.decode(data) 749 750 751 class TestDictToIterable: 752 """ 753 The dict_to_iterable function has some subtle requirements: validates that 754 everything behaves as expected. 755 756 As much as possible this tries to be exhaustive. 757 """ 758 keys = one_of( 759 text().filter(lambda k: k and not k.startswith(':')), 760 binary().filter(lambda k: k and not k.startswith(b':')) 761 ) 762 763 @given( 764 special_keys=sets(keys), 765 boring_keys=sets(keys), 766 ) 767 def test_ordering(self, special_keys, boring_keys): 768 """ 769 _dict_to_iterable produces an iterable where all the keys beginning 770 with a colon are emitted first. 771 """ 772 def _prepend_colon(k): 773 if isinstance(k, str): 774 return ':' + k 775 else: 776 return b':' + k 777 778 special_keys = set(map(_prepend_colon, special_keys)) 779 input_dict = { 780 k: b'testval' for k in itertools.chain( 781 special_keys, 782 boring_keys 783 ) 784 } 785 filtered = _dict_to_iterable(input_dict) 786 787 received_special = set() 788 received_boring = set() 789 790 for _ in special_keys: 791 k, _ = next(filtered) 792 received_special.add(k) 793 for _ in boring_keys: 794 k, _ = next(filtered) 795 received_boring.add(k) 796 797 assert special_keys == received_special 798 assert boring_keys == received_boring 799 800 @given( 801 special_keys=sets(keys), 802 boring_keys=sets(keys), 803 ) 804 def test_ordering_applies_to_encoding(self, special_keys, boring_keys): 805 """ 806 When encoding a dictionary the special keys all appear first. 807 """ 808 def _prepend_colon(k): 809 if isinstance(k, str): 810 return ':' + k 811 else: 812 return b':' + k 813 814 special_keys = set(map(_prepend_colon, special_keys)) 815 input_dict = { 816 k: b'testval' for k in itertools.chain( 817 special_keys, 818 boring_keys 819 ) 820 } 821 e = Encoder() 822 d = Decoder() 823 encoded = e.encode(input_dict) 824 decoded = iter(d.decode(encoded, raw=True)) 825 826 received_special = set() 827 received_boring = set() 828 expected_special = set(map(_to_bytes, special_keys)) 829 expected_boring = set(map(_to_bytes, boring_keys)) 830 831 for _ in special_keys: 832 k, _ = next(decoded) 833 received_special.add(k) 834 for _ in boring_keys: 835 k, _ = next(decoded) 836 received_boring.add(k) 837 838 assert expected_special == received_special 839 assert expected_boring == received_boring