tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_hpack.py (28644B)


      1 # -*- coding: utf-8 -*-
      2 import itertools
      3 import pytest
      4 
      5 from hypothesis import given
      6 from hypothesis.strategies import text, binary, sets, one_of
      7 
      8 from hpack import (
      9    Encoder,
     10    Decoder,
     11    HeaderTuple,
     12    NeverIndexedHeaderTuple,
     13    HPACKDecodingError,
     14    InvalidTableIndex,
     15    OversizedHeaderListError,
     16    InvalidTableSizeError,
     17 )
     18 from hpack.hpack import _dict_to_iterable, _to_bytes
     19 
     20 
     21 class TestHPACKEncoder:
     22    # These tests are stolen entirely from the IETF specification examples.
     23    def test_literal_header_field_with_indexing(self):
     24        """
     25        The header field representation uses a literal name and a literal
     26        value.
     27        """
     28        e = Encoder()
     29        header_set = {'custom-key': 'custom-header'}
     30        result = b'\x40\x0acustom-key\x0dcustom-header'
     31 
     32        assert e.encode(header_set, huffman=False) == result
     33        assert list(e.header_table.dynamic_entries) == [
     34            (n.encode('utf-8'), v.encode('utf-8'))
     35            for n, v in header_set.items()
     36        ]
     37 
     38    def test_sensitive_headers(self):
     39        """
     40        Test encoding header values
     41        """
     42        e = Encoder()
     43        result = (b'\x82\x14\x88\x63\xa1\xa9' +
     44                  b'\x32\x08\x73\xd0\xc7\x10' +
     45                  b'\x87\x25\xa8\x49\xe9\xea' +
     46                  b'\x5f\x5f\x89\x41\x6a\x41' +
     47                  b'\x92\x6e\xe5\x35\x52\x9f')
     48        header_set = [
     49            (':method', 'GET', True),
     50            (':path', '/jimiscool/', True),
     51            ('customkey', 'sensitiveinfo', True),
     52        ]
     53        assert e.encode(header_set, huffman=True) == result
     54 
     55    def test_non_sensitive_headers_with_header_tuples(self):
     56        """
     57        A header field stored in a HeaderTuple emits a representation that
     58        allows indexing.
     59        """
     60        e = Encoder()
     61        result = (b'\x82\x44\x88\x63\xa1\xa9' +
     62                  b'\x32\x08\x73\xd0\xc7\x40' +
     63                  b'\x87\x25\xa8\x49\xe9\xea' +
     64                  b'\x5f\x5f\x89\x41\x6a\x41' +
     65                  b'\x92\x6e\xe5\x35\x52\x9f')
     66        header_set = [
     67            HeaderTuple(':method', 'GET'),
     68            HeaderTuple(':path', '/jimiscool/'),
     69            HeaderTuple('customkey', 'sensitiveinfo'),
     70        ]
     71        assert e.encode(header_set, huffman=True) == result
     72 
     73    def test_sensitive_headers_with_header_tuples(self):
     74        """
     75        A header field stored in a NeverIndexedHeaderTuple emits a
     76        representation that forbids indexing.
     77        """
     78        e = Encoder()
     79        result = (b'\x82\x14\x88\x63\xa1\xa9' +
     80                  b'\x32\x08\x73\xd0\xc7\x10' +
     81                  b'\x87\x25\xa8\x49\xe9\xea' +
     82                  b'\x5f\x5f\x89\x41\x6a\x41' +
     83                  b'\x92\x6e\xe5\x35\x52\x9f')
     84        header_set = [
     85            NeverIndexedHeaderTuple(':method', 'GET'),
     86            NeverIndexedHeaderTuple(':path', '/jimiscool/'),
     87            NeverIndexedHeaderTuple('customkey', 'sensitiveinfo'),
     88        ]
     89        assert e.encode(header_set, huffman=True) == result
     90 
     91    def test_header_table_size_getter(self):
     92        e = Encoder()
     93        assert e.header_table_size == 4096
     94 
     95    def test_indexed_literal_header_field_with_indexing(self):
     96        """
     97        The header field representation uses an indexed name and a literal
     98        value and performs incremental indexing.
     99        """
    100        e = Encoder()
    101        header_set = {':path': '/sample/path'}
    102        result = b'\x44\x0c/sample/path'
    103 
    104        assert e.encode(header_set, huffman=False) == result
    105        assert list(e.header_table.dynamic_entries) == [
    106            (n.encode('utf-8'), v.encode('utf-8'))
    107            for n, v in header_set.items()
    108        ]
    109 
    110    def test_indexed_header_field(self):
    111        """
    112        The header field representation uses an indexed header field, from
    113        the static table.
    114        """
    115        e = Encoder()
    116        header_set = {':method': 'GET'}
    117        result = b'\x82'
    118 
    119        assert e.encode(header_set, huffman=False) == result
    120        assert list(e.header_table.dynamic_entries) == []
    121 
    122    def test_indexed_header_field_from_static_table(self):
    123        e = Encoder()
    124        e.header_table_size = 0
    125        header_set = {':method': 'GET'}
    126        result = b'\x82'
    127 
    128        # Make sure we don't emit an encoding context update.
    129        e.header_table.resized = False
    130 
    131        assert e.encode(header_set, huffman=False) == result
    132        assert list(e.header_table.dynamic_entries) == []
    133 
    134    def test_request_examples_without_huffman(self):
    135        """
    136        This section shows several consecutive header sets, corresponding to
    137        HTTP requests, on the same connection.
    138        """
    139        e = Encoder()
    140        first_header_set = [
    141            (':method', 'GET',),
    142            (':scheme', 'http',),
    143            (':path', '/',),
    144            (':authority', 'www.example.com'),
    145        ]
    146        # We should have :authority in first_header_table since we index it
    147        first_header_table = [(':authority', 'www.example.com')]
    148        first_result = b'\x82\x86\x84\x41\x0fwww.example.com'
    149 
    150        assert e.encode(first_header_set, huffman=False) == first_result
    151        assert list(e.header_table.dynamic_entries) == [
    152            (n.encode('utf-8'), v.encode('utf-8'))
    153            for n, v in first_header_table
    154        ]
    155 
    156        second_header_set = [
    157            (':method', 'GET',),
    158            (':scheme', 'http',),
    159            (':path', '/',),
    160            (':authority', 'www.example.com',),
    161            ('cache-control', 'no-cache'),
    162        ]
    163        second_header_table = [
    164            ('cache-control', 'no-cache'),
    165            (':authority', 'www.example.com')
    166        ]
    167        second_result = b'\x82\x86\x84\xbeX\x08no-cache'
    168 
    169        assert e.encode(second_header_set, huffman=False) == second_result
    170        assert list(e.header_table.dynamic_entries) == [
    171            (n.encode('utf-8'), v.encode('utf-8'))
    172            for n, v in second_header_table
    173        ]
    174 
    175        third_header_set = [
    176            (':method', 'GET',),
    177            (':scheme', 'https',),
    178            (':path', '/index.html',),
    179            (':authority', 'www.example.com',),
    180            ('custom-key', 'custom-value'),
    181        ]
    182        third_result = (
    183            b'\x82\x87\x85\xbf@\ncustom-key\x0ccustom-value'
    184        )
    185 
    186        assert e.encode(third_header_set, huffman=False) == third_result
    187        # Don't check the header table here, it's just too complex to be
    188        # reliable. Check its length though.
    189        assert len(e.header_table.dynamic_entries) == 3
    190 
    191    def test_request_examples_with_huffman(self):
    192        """
    193        This section shows the same examples as the previous section, but
    194        using Huffman encoding for the literal values.
    195        """
    196        e = Encoder()
    197        first_header_set = [
    198            (':method', 'GET',),
    199            (':scheme', 'http',),
    200            (':path', '/',),
    201            (':authority', 'www.example.com'),
    202        ]
    203        first_header_table = [(':authority', 'www.example.com')]
    204        first_result = (
    205            b'\x82\x86\x84\x41\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff'
    206        )
    207 
    208        assert e.encode(first_header_set, huffman=True) == first_result
    209        assert list(e.header_table.dynamic_entries) == [
    210            (n.encode('utf-8'), v.encode('utf-8'))
    211            for n, v in first_header_table
    212        ]
    213 
    214        second_header_table = [
    215            ('cache-control', 'no-cache'),
    216            (':authority', 'www.example.com')
    217        ]
    218        second_header_set = [
    219            (':method', 'GET',),
    220            (':scheme', 'http',),
    221            (':path', '/',),
    222            (':authority', 'www.example.com',),
    223            ('cache-control', 'no-cache'),
    224        ]
    225        second_result = b'\x82\x86\x84\xbeX\x86\xa8\xeb\x10d\x9c\xbf'
    226 
    227        assert e.encode(second_header_set, huffman=True) == second_result
    228        assert list(e.header_table.dynamic_entries) == [
    229            (n.encode('utf-8'), v.encode('utf-8'))
    230            for n, v in second_header_table
    231        ]
    232 
    233        third_header_set = [
    234            (':method', 'GET',),
    235            (':scheme', 'https',),
    236            (':path', '/index.html',),
    237            (':authority', 'www.example.com',),
    238            ('custom-key', 'custom-value'),
    239        ]
    240        third_result = (
    241            b'\x82\x87\x85\xbf'
    242            b'@\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8\xb4\xbf'
    243        )
    244 
    245        assert e.encode(third_header_set, huffman=True) == third_result
    246        assert len(e.header_table.dynamic_entries) == 3
    247 
    248    # These tests are custom, for hyper.
    249    def test_resizing_header_table(self):
    250        # We need to encode a substantial number of headers, to populate the
    251        # header table.
    252        e = Encoder()
    253        header_set = [
    254            (':method', 'GET'),
    255            (':scheme', 'https'),
    256            (':path', '/some/path'),
    257            (':authority', 'www.example.com'),
    258            ('custom-key', 'custom-value'),
    259            (
    260                "user-agent",
    261                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:16.0) "
    262                "Gecko/20100101 Firefox/16.0",
    263            ),
    264            (
    265                "accept",
    266                "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;"
    267                "q=0.8",
    268            ),
    269            ('X-Lukasa-Test', '88989'),
    270        ]
    271        e.encode(header_set, huffman=True)
    272 
    273        # Resize the header table to a size so small that nothing can be in it.
    274        e.header_table_size = 40
    275        assert len(e.header_table.dynamic_entries) == 0
    276 
    277    def test_resizing_header_table_sends_multiple_updates(self):
    278        e = Encoder()
    279 
    280        e.header_table_size = 40
    281        e.header_table_size = 100
    282        e.header_table_size = 40
    283 
    284        header_set = [(':method', 'GET')]
    285        out = e.encode(header_set, huffman=True)
    286        assert out == b'\x3F\x09\x3F\x45\x3F\x09\x82'
    287 
    288    def test_resizing_header_table_to_same_size_ignored(self):
    289        e = Encoder()
    290 
    291        # These size changes should be ignored
    292        e.header_table_size = 4096
    293        e.header_table_size = 4096
    294        e.header_table_size = 4096
    295 
    296        # These size changes should be encoded
    297        e.header_table_size = 40
    298        e.header_table_size = 100
    299        e.header_table_size = 40
    300 
    301        header_set = [(':method', 'GET')]
    302        out = e.encode(header_set, huffman=True)
    303        assert out == b'\x3F\x09\x3F\x45\x3F\x09\x82'
    304 
    305    def test_resizing_header_table_sends_context_update(self):
    306        e = Encoder()
    307 
    308        # Resize the header table to a size so small that nothing can be in it.
    309        e.header_table_size = 40
    310 
    311        # Now, encode a header set. Just a small one, with a well-defined
    312        # output.
    313        header_set = [(':method', 'GET')]
    314        out = e.encode(header_set, huffman=True)
    315 
    316        assert out == b'?\t\x82'
    317 
    318    def test_setting_table_size_to_the_same_does_nothing(self):
    319        e = Encoder()
    320 
    321        # Set the header table size to the default.
    322        e.header_table_size = 4096
    323 
    324        # Now encode a header set. Just a small one, with a well-defined
    325        # output.
    326        header_set = [(':method', 'GET')]
    327        out = e.encode(header_set, huffman=True)
    328 
    329        assert out == b'\x82'
    330 
    331    def test_evicting_header_table_objects(self):
    332        e = Encoder()
    333 
    334        # Set the header table size large enough to include one header.
    335        e.header_table_size = 66
    336        header_set = [('a', 'b'), ('long-custom-header', 'longish value')]
    337        e.encode(header_set)
    338 
    339        assert len(e.header_table.dynamic_entries) == 1
    340 
    341    def test_headers_generator(self):
    342        e = Encoder()
    343 
    344        def headers_generator():
    345            return (("k" + str(i), "v" + str(i)) for i in range(3))
    346 
    347        header_set = headers_generator()
    348        out = e.encode(header_set)
    349        assert Decoder().decode(out) == list(headers_generator())
    350 
    351 
    352 class TestHPACKDecoder:
    353    # These tests are stolen entirely from the IETF specification examples.
    354    def test_literal_header_field_with_indexing(self):
    355        """
    356        The header field representation uses a literal name and a literal
    357        value.
    358        """
    359        d = Decoder()
    360        header_set = [('custom-key', 'custom-header')]
    361        data = b'\x40\x0acustom-key\x0dcustom-header'
    362 
    363        assert d.decode(data) == header_set
    364        assert list(d.header_table.dynamic_entries) == [
    365            (n.encode('utf-8'), v.encode('utf-8')) for n, v in header_set
    366        ]
    367 
    368    def test_raw_decoding(self):
    369        """
    370        The header field representation is decoded as a raw byte string instead
    371        of UTF-8
    372        """
    373        d = Decoder()
    374        header_set = [
    375            (b'\x00\x01\x99\x30\x11\x22\x55\x21\x89\x14', b'custom-header')
    376        ]
    377        data = (
    378            b'\x40\x0a\x00\x01\x99\x30\x11\x22\x55\x21\x89\x14\x0d'
    379            b'custom-header'
    380        )
    381 
    382        assert d.decode(data, raw=True) == header_set
    383 
    384    def test_literal_header_field_without_indexing(self):
    385        """
    386        The header field representation uses an indexed name and a literal
    387        value.
    388        """
    389        d = Decoder()
    390        header_set = [(':path', '/sample/path')]
    391        data = b'\x04\x0c/sample/path'
    392 
    393        assert d.decode(data) == header_set
    394        assert list(d.header_table.dynamic_entries) == []
    395 
    396    def test_header_table_size_getter(self):
    397        d = Decoder()
    398        assert d.header_table_size
    399 
    400    def test_indexed_header_field(self):
    401        """
    402        The header field representation uses an indexed header field, from
    403        the static table.
    404        """
    405        d = Decoder()
    406        header_set = [(':method', 'GET')]
    407        data = b'\x82'
    408 
    409        assert d.decode(data) == header_set
    410        assert list(d.header_table.dynamic_entries) == []
    411 
    412    def test_request_examples_without_huffman(self):
    413        """
    414        This section shows several consecutive header sets, corresponding to
    415        HTTP requests, on the same connection.
    416        """
    417        d = Decoder()
    418        first_header_set = [
    419            (':method', 'GET',),
    420            (':scheme', 'http',),
    421            (':path', '/',),
    422            (':authority', 'www.example.com'),
    423        ]
    424        # The first_header_table doesn't contain 'authority'
    425        first_data = b'\x82\x86\x84\x01\x0fwww.example.com'
    426 
    427        assert d.decode(first_data) == first_header_set
    428        assert list(d.header_table.dynamic_entries) == []
    429 
    430        # This request takes advantage of the differential encoding of header
    431        # sets.
    432        second_header_set = [
    433            (':method', 'GET',),
    434            (':scheme', 'http',),
    435            (':path', '/',),
    436            (':authority', 'www.example.com',),
    437            ('cache-control', 'no-cache'),
    438        ]
    439        second_data = (
    440            b'\x82\x86\x84\x01\x0fwww.example.com\x0f\t\x08no-cache'
    441        )
    442 
    443        assert d.decode(second_data) == second_header_set
    444        assert list(d.header_table.dynamic_entries) == []
    445 
    446        third_header_set = [
    447            (':method', 'GET',),
    448            (':scheme', 'https',),
    449            (':path', '/index.html',),
    450            (':authority', 'www.example.com',),
    451            ('custom-key', 'custom-value'),
    452        ]
    453        third_data = (
    454            b'\x82\x87\x85\x01\x0fwww.example.com@\ncustom-key\x0ccustom-value'
    455        )
    456 
    457        assert d.decode(third_data) == third_header_set
    458        # Don't check the header table here, it's just too complex to be
    459        # reliable. Check its length though.
    460        assert len(d.header_table.dynamic_entries) == 1
    461 
    462    def test_request_examples_with_huffman(self):
    463        """
    464        This section shows the same examples as the previous section, but
    465        using Huffman encoding for the literal values.
    466        """
    467        d = Decoder()
    468 
    469        first_header_set = [
    470            (':method', 'GET',),
    471            (':scheme', 'http',),
    472            (':path', '/',),
    473            (':authority', 'www.example.com'),
    474        ]
    475        first_data = (
    476            b'\x82\x86\x84\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff'
    477        )
    478 
    479        assert d.decode(first_data) == first_header_set
    480        assert list(d.header_table.dynamic_entries) == []
    481 
    482        second_header_set = [
    483            (':method', 'GET',),
    484            (':scheme', 'http',),
    485            (':path', '/',),
    486            (':authority', 'www.example.com',),
    487            ('cache-control', 'no-cache'),
    488        ]
    489        second_data = (
    490            b'\x82\x86\x84\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff'
    491            b'\x0f\t\x86\xa8\xeb\x10d\x9c\xbf'
    492        )
    493 
    494        assert d.decode(second_data) == second_header_set
    495        assert list(d.header_table.dynamic_entries) == []
    496 
    497        third_header_set = [
    498            (':method', 'GET',),
    499            (':scheme', 'https',),
    500            (':path', '/index.html',),
    501            (':authority', 'www.example.com',),
    502            ('custom-key', 'custom-value'),
    503        ]
    504        third_data = (
    505            b'\x82\x87\x85\x01\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0\xab\x90\xf4\xff@'
    506            b'\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8\xb4\xbf'
    507        )
    508 
    509        assert d.decode(third_data) == third_header_set
    510        assert len(d.header_table.dynamic_entries) == 1
    511 
    512    # These tests are custom, for hyper.
    513    def test_resizing_header_table(self):
    514        # We need to decode a substantial number of headers, to populate the
    515        # header table. This string isn't magic: it's the output from the
    516        # equivalent test for the Encoder.
    517        d = Decoder()
    518        data = (
    519            b'\x82\x87D\x87a\x07\xa4\xacV4\xcfA\x8c\xf1\xe3\xc2\xe5\xf2:k\xa0'
    520            b'\xab\x90\xf4\xff@\x88%\xa8I\xe9[\xa9}\x7f\x89%\xa8I\xe9[\xb8\xe8'
    521            b'\xb4\xbfz\xbc\xd0\x7ff\xa2\x81\xb0\xda\xe0S\xfa\xd02\x1a\xa4\x9d'
    522            b'\x13\xfd\xa9\x92\xa4\x96\x854\x0c\x8aj\xdc\xa7\xe2\x81\x02\xef}'
    523            b'\xa9g{\x81qp\x7fjb):\x9d\x81\x00 \x00@\x150\x9a\xc2\xca\x7f,\x05'
    524            b'\xc5\xc1S\xb0I|\xa5\x89\xd3M\x1fC\xae\xba\x0cA\xa4\xc7\xa9\x8f3'
    525            b'\xa6\x9a?\xdf\x9ah\xfa\x1du\xd0b\r&=Ly\xa6\x8f\xbe\xd0\x01w\xfe'
    526            b'\xbeX\xf9\xfb\xed\x00\x17{@\x8a\xfc[=\xbdF\x81\xad\xbc\xa8O\x84y'
    527            b'\xe7\xde\x7f'
    528        )
    529        d.decode(data)
    530 
    531        # Resize the header table to a size so small that nothing can be in it.
    532        d.header_table_size = 40
    533        assert len(d.header_table.dynamic_entries) == 0
    534 
    535    def test_apache_trafficserver(self):
    536        # This test reproduces the bug in #110, using exactly the same header
    537        # data.
    538        d = Decoder()
    539        data = (
    540            b'\x10\x07:status\x03200@\x06server\tATS/6.0.0'
    541            b'@\x04date\x1dTue, 31 Mar 2015 08:09:51 GMT'
    542            b'@\x0ccontent-type\ttext/html@\x0econtent-length\x0542468'
    543            b'@\rlast-modified\x1dTue, 31 Mar 2015 01:55:51 GMT'
    544            b'@\x04vary\x0fAccept-Encoding@\x04etag\x0f"5519fea7-a5e4"'
    545            b'@\x08x-served\x05Nginx@\x14x-subdomain-tryfiles\x04True'
    546            b'@\x07x-deity\thydra-lts@\raccept-ranges\x05bytes@\x03age\x010'
    547            b'@\x19strict-transport-security\rmax-age=86400'
    548            b'@\x03via2https/1.1 ATS (ApacheTrafficServer/6.0.0 [cSsNfU])'
    549        )
    550        expect = [
    551            (':status', '200'),
    552            ('server', 'ATS/6.0.0'),
    553            ('date', 'Tue, 31 Mar 2015 08:09:51 GMT'),
    554            ('content-type', 'text/html'),
    555            ('content-length', '42468'),
    556            ('last-modified', 'Tue, 31 Mar 2015 01:55:51 GMT'),
    557            ('vary', 'Accept-Encoding'),
    558            ('etag', '"5519fea7-a5e4"'),
    559            ('x-served', 'Nginx'),
    560            ('x-subdomain-tryfiles', 'True'),
    561            ('x-deity', 'hydra-lts'),
    562            ('accept-ranges', 'bytes'),
    563            ('age', '0'),
    564            ('strict-transport-security', 'max-age=86400'),
    565            ('via', 'https/1.1 ATS (ApacheTrafficServer/6.0.0 [cSsNfU])'),
    566        ]
    567 
    568        result = d.decode(data)
    569 
    570        assert result == expect
    571        # The status header shouldn't be indexed.
    572        assert len(d.header_table.dynamic_entries) == len(expect) - 1
    573 
    574    def test_utf8_errors_raise_hpack_decoding_error(self):
    575        d = Decoder()
    576 
    577        # Invalid UTF-8 data.
    578        data = b'\x82\x86\x84\x01\x10www.\x07\xaa\xd7\x95\xd7\xa8\xd7\x94.com'
    579 
    580        with pytest.raises(HPACKDecodingError):
    581            d.decode(data)
    582 
    583    def test_invalid_indexed_literal(self):
    584        d = Decoder()
    585 
    586        # Refer to an index that is too large.
    587        data = b'\x82\x86\x84\x7f\x0a\x0fwww.example.com'
    588        with pytest.raises(InvalidTableIndex):
    589            d.decode(data)
    590 
    591    def test_invalid_indexed_header(self):
    592        d = Decoder()
    593 
    594        # Refer to an indexed header that is too large.
    595        data = b'\xBE\x86\x84\x01\x0fwww.example.com'
    596        with pytest.raises(InvalidTableIndex):
    597            d.decode(data)
    598 
    599    def test_literal_header_field_with_indexing_emits_headertuple(self):
    600        """
    601        A header field with indexing emits a HeaderTuple.
    602        """
    603        d = Decoder()
    604        data = b'\x00\x0acustom-key\x0dcustom-header'
    605 
    606        headers = d.decode(data)
    607        assert len(headers) == 1
    608 
    609        header = headers[0]
    610        assert isinstance(header, HeaderTuple)
    611        assert not isinstance(header, NeverIndexedHeaderTuple)
    612 
    613    def test_literal_never_indexed_emits_neverindexedheadertuple(self):
    614        """
    615        A literal header field that must never be indexed emits a
    616        NeverIndexedHeaderTuple.
    617        """
    618        d = Decoder()
    619        data = b'\x10\x0acustom-key\x0dcustom-header'
    620 
    621        headers = d.decode(data)
    622        assert len(headers) == 1
    623 
    624        header = headers[0]
    625        assert isinstance(header, NeverIndexedHeaderTuple)
    626 
    627    def test_indexed_never_indexed_emits_neverindexedheadertuple(self):
    628        """
    629        A header field with an indexed name that must never be indexed emits a
    630        NeverIndexedHeaderTuple.
    631        """
    632        d = Decoder()
    633        data = b'\x14\x0c/sample/path'
    634 
    635        headers = d.decode(data)
    636        assert len(headers) == 1
    637 
    638        header = headers[0]
    639        assert isinstance(header, NeverIndexedHeaderTuple)
    640 
    641    def test_max_header_list_size(self):
    642        """
    643        If the header block is larger than the max_header_list_size, the HPACK
    644        decoder throws an OversizedHeaderListError.
    645        """
    646        d = Decoder(max_header_list_size=44)
    647        data = b'\x14\x0c/sample/path'
    648 
    649        with pytest.raises(OversizedHeaderListError):
    650            d.decode(data)
    651 
    652    def test_can_decode_multiple_header_table_size_changes(self):
    653        """
    654        If multiple header table size changes are sent in at once, they are
    655        successfully decoded.
    656        """
    657        d = Decoder()
    658        data = b'?a?\xe1\x1f\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
    659        expect = [
    660            (':method', 'GET'),
    661            (':scheme', 'https'),
    662            (':path', '/'),
    663            (':authority', '127.0.0.1:8443')
    664        ]
    665 
    666        assert d.decode(data) == expect
    667 
    668    def test_header_table_size_change_above_maximum(self):
    669        """
    670        If a header table size change is received that exceeds the maximum
    671        allowed table size, it is rejected.
    672        """
    673        d = Decoder()
    674        d.max_allowed_table_size = 127
    675        data = b'?a\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
    676 
    677        with pytest.raises(InvalidTableSizeError):
    678            d.decode(data)
    679 
    680    def test_table_size_not_adjusting(self):
    681        """
    682        If the header table size is shrunk, and then the remote peer doesn't
    683        join in the shrinking, then an error is raised.
    684        """
    685        d = Decoder()
    686        d.max_allowed_table_size = 128
    687        data = b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
    688 
    689        with pytest.raises(InvalidTableSizeError):
    690            d.decode(data)
    691 
    692    def test_table_size_last_rejected(self):
    693        """
    694        If a header table size change comes last in the header block, it is
    695        forbidden.
    696        """
    697        d = Decoder()
    698        data = b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99?a'
    699 
    700        with pytest.raises(HPACKDecodingError):
    701            d.decode(data)
    702 
    703    def test_table_size_middle_rejected(self):
    704        """
    705        If a header table size change comes anywhere but first in the header
    706        block, it is forbidden.
    707        """
    708        d = Decoder()
    709        data = b'\x82?a\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
    710 
    711        with pytest.raises(HPACKDecodingError):
    712            d.decode(data)
    713 
    714    def test_truncated_header_name(self):
    715        """
    716        If a header name is truncated an error is raised.
    717        """
    718        d = Decoder()
    719        # This is a simple header block that has a bad ending. The interesting
    720        # part begins on the second line. This indicates a string that has
    721        # literal name and value. The name is a 5 character huffman-encoded
    722        # string that is only three bytes long.
    723        data = (
    724            b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
    725            b'\x00\x85\xf2\xb2J'
    726        )
    727 
    728        with pytest.raises(HPACKDecodingError):
    729            d.decode(data)
    730 
    731    def test_truncated_header_value(self):
    732        """
    733        If a header value is truncated an error is raised.
    734        """
    735        d = Decoder()
    736        # This is a simple header block that has a bad ending. The interesting
    737        # part begins on the second line. This indicates a string that has
    738        # literal name and value. The name is a 5 character huffman-encoded
    739        # string, but the entire EOS character has been written over the end.
    740        # This causes hpack to see the header value as being supposed to be
    741        # 622462 bytes long, which it clearly is not, and so this must fail.
    742        data = (
    743            b'\x82\x87\x84A\x8a\x08\x9d\\\x0b\x81p\xdcy\xa6\x99'
    744            b'\x00\x85\xf2\xb2J\x87\xff\xff\xff\xfd%B\x7f'
    745        )
    746 
    747        with pytest.raises(HPACKDecodingError):
    748            d.decode(data)
    749 
    750 
    751 class TestDictToIterable:
    752    """
    753    The dict_to_iterable function has some subtle requirements: validates that
    754    everything behaves as expected.
    755 
    756    As much as possible this tries to be exhaustive.
    757    """
    758    keys = one_of(
    759        text().filter(lambda k: k and not k.startswith(':')),
    760        binary().filter(lambda k: k and not k.startswith(b':'))
    761    )
    762 
    763    @given(
    764        special_keys=sets(keys),
    765        boring_keys=sets(keys),
    766    )
    767    def test_ordering(self, special_keys, boring_keys):
    768        """
    769        _dict_to_iterable produces an iterable where all the keys beginning
    770        with a colon are emitted first.
    771        """
    772        def _prepend_colon(k):
    773            if isinstance(k, str):
    774                return ':' + k
    775            else:
    776                return b':' + k
    777 
    778        special_keys = set(map(_prepend_colon, special_keys))
    779        input_dict = {
    780            k: b'testval' for k in itertools.chain(
    781                special_keys,
    782                boring_keys
    783            )
    784        }
    785        filtered = _dict_to_iterable(input_dict)
    786 
    787        received_special = set()
    788        received_boring = set()
    789 
    790        for _ in special_keys:
    791            k, _ = next(filtered)
    792            received_special.add(k)
    793        for _ in boring_keys:
    794            k, _ = next(filtered)
    795            received_boring.add(k)
    796 
    797        assert special_keys == received_special
    798        assert boring_keys == received_boring
    799 
    800    @given(
    801        special_keys=sets(keys),
    802        boring_keys=sets(keys),
    803    )
    804    def test_ordering_applies_to_encoding(self, special_keys, boring_keys):
    805        """
    806        When encoding a dictionary the special keys all appear first.
    807        """
    808        def _prepend_colon(k):
    809            if isinstance(k, str):
    810                return ':' + k
    811            else:
    812                return b':' + k
    813 
    814        special_keys = set(map(_prepend_colon, special_keys))
    815        input_dict = {
    816            k: b'testval' for k in itertools.chain(
    817                special_keys,
    818                boring_keys
    819            )
    820        }
    821        e = Encoder()
    822        d = Decoder()
    823        encoded = e.encode(input_dict)
    824        decoded = iter(d.decode(encoded, raw=True))
    825 
    826        received_special = set()
    827        received_boring = set()
    828        expected_special = set(map(_to_bytes, special_keys))
    829        expected_boring = set(map(_to_bytes, boring_keys))
    830 
    831        for _ in special_keys:
    832            k, _ = next(decoded)
    833            received_special.add(k)
    834        for _ in boring_keys:
    835            k, _ = next(decoded)
    836            received_boring.add(k)
    837 
    838        assert expected_special == received_special
    839        assert expected_boring == received_boring