tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_frames.py (30598B)


      1 # -*- coding: utf-8 -*-
      2 from hyperframe.frame import (
      3    Frame, Flags, DataFrame, PriorityFrame, RstStreamFrame, SettingsFrame,
      4    PushPromiseFrame, PingFrame, GoAwayFrame, WindowUpdateFrame, HeadersFrame,
      5    ContinuationFrame, AltSvcFrame, ExtensionFrame
      6 )
      7 from hyperframe.exceptions import (
      8    UnknownFrameError, InvalidPaddingError, InvalidFrameError, InvalidDataError
      9 )
     10 import pytest
     11 
     12 
     13 def decode_frame(frame_data):
     14    f, length = Frame.parse_frame_header(frame_data[:9])
     15    f.parse_body(memoryview(frame_data[9:9 + length]))
     16    assert 9 + length == len(frame_data)
     17    return f
     18 
     19 
     20 class TestGeneralFrameBehaviour:
     21    def test_base_frame_ignores_flags(self):
     22        f = Frame(0)
     23        flags = f.parse_flags(0xFF)
     24        assert not flags
     25        assert isinstance(flags, Flags)
     26 
     27    def test_base_frame_cant_serialize(self):
     28        f = Frame(0)
     29        with pytest.raises(NotImplementedError):
     30            f.serialize()
     31 
     32    def test_base_frame_cant_parse_body(self):
     33        data = b''
     34        f = Frame(0)
     35        with pytest.raises(NotImplementedError):
     36            f.parse_body(data)
     37 
     38    def test_parse_frame_header_unknown_type_strict(self):
     39        with pytest.raises(UnknownFrameError) as excinfo:
     40            Frame.parse_frame_header(
     41                b'\x00\x00\x59\xFF\x00\x00\x00\x00\x01',
     42                strict=True
     43            )
     44        exception = excinfo.value
     45        assert exception.frame_type == 0xFF
     46        assert exception.length == 0x59
     47        assert str(exception) == (
     48            "UnknownFrameError: Unknown frame type 0xFF received, "
     49            "length 89 bytes"
     50        )
     51 
     52    def test_parse_frame_header_ignore_first_bit_of_stream_id(self):
     53        s = b'\x00\x00\x00\x06\x01\x80\x00\x00\x00'
     54        f, _ = Frame.parse_frame_header(s)
     55 
     56        assert f.stream_id == 0
     57 
     58    def test_parse_frame_header_unknown_type(self):
     59        frame, length = Frame.parse_frame_header(
     60            b'\x00\x00\x59\xFF\x00\x00\x00\x00\x01'
     61        )
     62        assert frame.type == 0xFF
     63        assert length == 0x59
     64        assert isinstance(frame, ExtensionFrame)
     65        assert frame.stream_id == 1
     66 
     67    def test_flags_are_persisted(self):
     68        frame, length = Frame.parse_frame_header(
     69            b'\x00\x00\x59\xFF\x09\x00\x00\x00\x01'
     70        )
     71        assert frame.type == 0xFF
     72        assert length == 0x59
     73        assert frame.flag_byte == 0x09
     74 
     75    def test_parse_body_unknown_type(self):
     76        frame = decode_frame(
     77            b'\x00\x00\x0C\xFF\x00\x00\x00\x00\x01hello world!'
     78        )
     79        assert frame.body == b'hello world!'
     80        assert frame.body_len == 12
     81        assert frame.stream_id == 1
     82 
     83    def test_can_round_trip_unknown_frames(self):
     84        frame_data = b'\x00\x00\x0C\xFF\x00\x00\x00\x00\x01hello world!'
     85        f = decode_frame(frame_data)
     86        assert f.serialize() == frame_data
     87 
     88    def test_repr(self, monkeypatch):
     89        f = Frame(0)
     90        monkeypatch.setattr(Frame, "serialize_body", lambda _: b"body")
     91        assert repr(f) == (
     92            "Frame(stream_id=0, flags=[]): <hex:626f6479>"
     93        )
     94 
     95        f.stream_id = 42
     96        f.flags = ["END_STREAM", "PADDED"]
     97        assert repr(f) == (
     98            "Frame(stream_id=42, flags=['END_STREAM', 'PADDED']): <hex:626f6479>"
     99        )
    100 
    101        monkeypatch.setattr(Frame, "serialize_body", lambda _: b"A"*25)
    102        assert repr(f) == (
    103            "Frame(stream_id=42, flags=['END_STREAM', 'PADDED']): <hex:{}...>".format("41"*10)
    104        )
    105 
    106    def test_frame_explain(self, capsys):
    107        d = b'\x00\x00\x08\x00\x01\x00\x00\x00\x01testdata'
    108        Frame.explain(memoryview(d))
    109        captured = capsys.readouterr()
    110        assert captured.out.strip() == "DataFrame(stream_id=1, flags=['END_STREAM']): <hex:7465737464617461>"
    111 
    112    def test_cannot_parse_invalid_frame_header(self):
    113        with pytest.raises(InvalidFrameError):
    114            Frame.parse_frame_header(b'\x00\x00\x08\x00\x01\x00\x00\x00')
    115 
    116 
    117 class TestDataFrame:
    118    payload = b'\x00\x00\x08\x00\x01\x00\x00\x00\x01testdata'
    119    payload_with_padding = (
    120        b'\x00\x00\x13\x00\x09\x00\x00\x00\x01\x0Atestdata' + b'\0' * 10
    121    )
    122 
    123    def test_repr(self):
    124        f = DataFrame(1, b"testdata")
    125        assert repr(f).endswith("<hex:7465737464617461>")
    126 
    127    def test_data_frame_has_correct_flags(self):
    128        f = DataFrame(1)
    129        flags = f.parse_flags(0xFF)
    130        assert flags == set([
    131            'END_STREAM', 'PADDED'
    132        ])
    133 
    134    @pytest.mark.parametrize('data', [
    135        b'testdata',
    136        memoryview(b'testdata')
    137    ])
    138    def test_data_frame_serializes_properly(self, data):
    139        f = DataFrame(1)
    140        f.flags = set(['END_STREAM'])
    141        f.data = data
    142 
    143        s = f.serialize()
    144        assert s == self.payload
    145 
    146    def test_data_frame_with_padding_serializes_properly(self):
    147        f = DataFrame(1)
    148        f.flags = set(['END_STREAM', 'PADDED'])
    149        f.data = b'testdata'
    150        f.pad_length = 10
    151 
    152        s = f.serialize()
    153        assert s == self.payload_with_padding
    154 
    155    def test_data_frame_parses_properly(self):
    156        f = decode_frame(self.payload)
    157 
    158        assert isinstance(f, DataFrame)
    159        assert f.flags == set(['END_STREAM'])
    160        assert f.pad_length == 0
    161        assert f.data == b'testdata'
    162        assert f.body_len == 8
    163 
    164    def test_data_frame_with_padding_parses_properly(self):
    165        f = decode_frame(self.payload_with_padding)
    166 
    167        assert isinstance(f, DataFrame)
    168        assert f.flags == set(['END_STREAM', 'PADDED'])
    169        assert f.pad_length == 10
    170        assert f.data == b'testdata'
    171        assert f.body_len == 19
    172 
    173    def test_data_frame_with_invalid_padding_errors(self):
    174        with pytest.raises(InvalidFrameError):
    175            decode_frame(self.payload_with_padding[:9])
    176 
    177    def test_data_frame_with_padding_calculates_flow_control_len(self):
    178        f = DataFrame(1)
    179        f.flags = set(['PADDED'])
    180        f.data = b'testdata'
    181        f.pad_length = 10
    182 
    183        assert f.flow_controlled_length == 19
    184 
    185    def test_data_frame_zero_length_padding_calculates_flow_control_len(self):
    186        f = DataFrame(1)
    187        f.flags = set(['PADDED'])
    188        f.data = b'testdata'
    189        f.pad_length = 0
    190 
    191        assert f.flow_controlled_length == len(b'testdata') + 1
    192 
    193    def test_data_frame_without_padding_calculates_flow_control_len(self):
    194        f = DataFrame(1)
    195        f.data = b'testdata'
    196 
    197        assert f.flow_controlled_length == 8
    198 
    199    def test_data_frame_comes_on_a_stream(self):
    200        with pytest.raises(InvalidDataError):
    201            DataFrame(0)
    202 
    203    def test_long_data_frame(self):
    204        f = DataFrame(1)
    205 
    206        # Use more than 256 bytes of data to force setting higher bits.
    207        f.data = b'\x01' * 300
    208        data = f.serialize()
    209 
    210        # The top three bytes should be numerically equal to 300. That means
    211        # they should read 00 01 2C.
    212        # The weird double index trick is to ensure this test behaves equally
    213        # on Python 2 and Python 3.
    214        assert data[0] == b'\x00'[0]
    215        assert data[1] == b'\x01'[0]
    216        assert data[2] == b'\x2C'[0]
    217 
    218    def test_body_length_behaves_correctly(self):
    219        f = DataFrame(1)
    220 
    221        f.data = b'\x01' * 300
    222 
    223        # Initially the body length is zero. For now this is incidental, but
    224        # I'm going to test it to ensure that the behaviour is codified. We
    225        # should change this test if we change that.
    226        assert f.body_len == 0
    227 
    228        f.serialize()
    229        assert f.body_len == 300
    230 
    231    def test_data_frame_with_invalid_padding_fails_to_parse(self):
    232        # This frame has a padding length of 6 bytes, but a total length of
    233        # only 5.
    234        data = b'\x00\x00\x05\x00\x0b\x00\x00\x00\x01\x06\x54\x65\x73\x74'
    235 
    236        with pytest.raises(InvalidPaddingError):
    237            decode_frame(data)
    238 
    239    def test_data_frame_with_no_length_parses(self):
    240        # Fixes issue with empty data frames raising InvalidPaddingError.
    241        f = DataFrame(1)
    242        f.data = b''
    243        data = f.serialize()
    244 
    245        new_frame = decode_frame(data)
    246        assert new_frame.data == b''
    247 
    248 
    249 class TestPriorityFrame:
    250    payload = b'\x00\x00\x05\x02\x00\x00\x00\x00\x01\x80\x00\x00\x04\x40'
    251 
    252    def test_repr(self):
    253        f = PriorityFrame(1)
    254        assert repr(f).endswith("exclusive=False, depends_on=0, stream_weight=0")
    255        f.exclusive = True
    256        f.depends_on = 0x04
    257        f.stream_weight = 64
    258        assert repr(f).endswith("exclusive=True, depends_on=4, stream_weight=64")
    259 
    260    def test_priority_frame_has_no_flags(self):
    261        f = PriorityFrame(1)
    262        flags = f.parse_flags(0xFF)
    263        assert flags == set()
    264        assert isinstance(flags, Flags)
    265 
    266    def test_priority_frame_default_serializes_properly(self):
    267        f = PriorityFrame(1)
    268 
    269        assert f.serialize() == (
    270            b'\x00\x00\x05\x02\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00'
    271        )
    272 
    273    def test_priority_frame_with_all_data_serializes_properly(self):
    274        f = PriorityFrame(1)
    275        f.depends_on = 0x04
    276        f.stream_weight = 64
    277        f.exclusive = True
    278 
    279        assert f.serialize() == self.payload
    280 
    281    def test_priority_frame_with_all_data_parses_properly(self):
    282        f = decode_frame(self.payload)
    283 
    284        assert isinstance(f, PriorityFrame)
    285        assert f.flags == set()
    286        assert f.depends_on == 4
    287        assert f.stream_weight == 64
    288        assert f.exclusive is True
    289        assert f.body_len == 5
    290 
    291    def test_priority_frame_invalid(self):
    292        with pytest.raises(InvalidFrameError):
    293            decode_frame(
    294                b'\x00\x00\x06\x02\x00\x00\x00\x00\x01\x80\x00\x00\x04\x40\xFF'
    295            )
    296 
    297    def test_priority_frame_comes_on_a_stream(self):
    298        with pytest.raises(InvalidDataError):
    299            PriorityFrame(0)
    300 
    301    def test_short_priority_frame_errors(self):
    302        with pytest.raises(InvalidFrameError):
    303            decode_frame(self.payload[:-2])
    304 
    305 
    306 class TestRstStreamFrame:
    307    def test_repr(self):
    308        f = RstStreamFrame(1)
    309        assert repr(f).endswith("error_code=0")
    310        f.error_code = 420
    311        assert repr(f).endswith("error_code=420")
    312 
    313    def test_rst_stream_frame_has_no_flags(self):
    314        f = RstStreamFrame(1)
    315        flags = f.parse_flags(0xFF)
    316        assert not flags
    317        assert isinstance(flags, Flags)
    318 
    319    def test_rst_stream_frame_serializes_properly(self):
    320        f = RstStreamFrame(1)
    321        f.error_code = 420
    322 
    323        s = f.serialize()
    324        assert s == b'\x00\x00\x04\x03\x00\x00\x00\x00\x01\x00\x00\x01\xa4'
    325 
    326    def test_rst_stream_frame_parses_properly(self):
    327        s = b'\x00\x00\x04\x03\x00\x00\x00\x00\x01\x00\x00\x01\xa4'
    328        f = decode_frame(s)
    329 
    330        assert isinstance(f, RstStreamFrame)
    331        assert f.flags == set()
    332        assert f.error_code == 420
    333        assert f.body_len == 4
    334 
    335    def test_rst_stream_frame_comes_on_a_stream(self):
    336        with pytest.raises(InvalidDataError):
    337            RstStreamFrame(0)
    338 
    339    def test_rst_stream_frame_must_have_body_length_four(self):
    340        f = RstStreamFrame(1)
    341        with pytest.raises(InvalidFrameError):
    342            f.parse_body(b'\x01')
    343 
    344 
    345 class TestSettingsFrame:
    346    serialized = (
    347        b'\x00\x00\x2A\x04\x01\x00\x00\x00\x00' +  # Frame header
    348        b'\x00\x01\x00\x00\x10\x00' +              # HEADER_TABLE_SIZE
    349        b'\x00\x02\x00\x00\x00\x00' +              # ENABLE_PUSH
    350        b'\x00\x03\x00\x00\x00\x64' +              # MAX_CONCURRENT_STREAMS
    351        b'\x00\x04\x00\x00\xFF\xFF' +              # INITIAL_WINDOW_SIZE
    352        b'\x00\x05\x00\x00\x40\x00' +              # MAX_FRAME_SIZE
    353        b'\x00\x06\x00\x00\xFF\xFF' +              # MAX_HEADER_LIST_SIZE
    354        b'\x00\x08\x00\x00\x00\x01'                # ENABLE_CONNECT_PROTOCOL
    355    )
    356 
    357    settings = {
    358        SettingsFrame.HEADER_TABLE_SIZE: 4096,
    359        SettingsFrame.ENABLE_PUSH: 0,
    360        SettingsFrame.MAX_CONCURRENT_STREAMS: 100,
    361        SettingsFrame.INITIAL_WINDOW_SIZE: 65535,
    362        SettingsFrame.MAX_FRAME_SIZE: 16384,
    363        SettingsFrame.MAX_HEADER_LIST_SIZE: 65535,
    364        SettingsFrame.ENABLE_CONNECT_PROTOCOL: 1,
    365    }
    366 
    367    def test_repr(self):
    368        f = SettingsFrame()
    369        assert repr(f).endswith("settings={}")
    370        f.settings[SettingsFrame.MAX_FRAME_SIZE] = 16384
    371        assert repr(f).endswith("settings={5: 16384}")
    372 
    373    def test_settings_frame_has_only_one_flag(self):
    374        f = SettingsFrame()
    375        flags = f.parse_flags(0xFF)
    376        assert flags == set(['ACK'])
    377 
    378    def test_settings_frame_serializes_properly(self):
    379        f = SettingsFrame()
    380        f.parse_flags(0xFF)
    381        f.settings = self.settings
    382 
    383        s = f.serialize()
    384        assert s == self.serialized
    385 
    386    def test_settings_frame_with_settings(self):
    387        f = SettingsFrame(settings=self.settings)
    388        assert f.settings == self.settings
    389 
    390    def test_settings_frame_without_settings(self):
    391        f = SettingsFrame()
    392        assert f.settings == {}
    393 
    394    def test_settings_frame_with_ack(self):
    395        f = SettingsFrame(flags=('ACK',))
    396        assert 'ACK' in f.flags
    397 
    398    def test_settings_frame_ack_and_settings(self):
    399        with pytest.raises(InvalidDataError):
    400            SettingsFrame(settings=self.settings, flags=('ACK',))
    401 
    402        with pytest.raises(InvalidDataError):
    403            decode_frame(self.serialized)
    404 
    405    def test_settings_frame_parses_properly(self):
    406        # unset the ACK flag to allow correct parsing
    407        data = self.serialized[:4] + b"\x00" + self.serialized[5:]
    408 
    409        f = decode_frame(data)
    410 
    411        assert isinstance(f, SettingsFrame)
    412        assert f.flags == set()
    413        assert f.settings == self.settings
    414        assert f.body_len == 42
    415 
    416    def test_settings_frame_invalid_body_length(self):
    417        with pytest.raises(InvalidFrameError):
    418            decode_frame(
    419                b'\x00\x00\x2A\x04\x00\x00\x00\x00\x00\xFF\xFF\xFF\xFF'
    420            )
    421 
    422    def test_settings_frames_never_have_streams(self):
    423        with pytest.raises(InvalidDataError):
    424            SettingsFrame(1)
    425 
    426    def test_short_settings_frame_errors(self):
    427        with pytest.raises(InvalidDataError):
    428            decode_frame(self.serialized[:-2])
    429 
    430 
    431 class TestPushPromiseFrame:
    432    def test_repr(self):
    433        f = PushPromiseFrame(1)
    434        assert repr(f).endswith("promised_stream_id=0, data=None")
    435        f.promised_stream_id = 4
    436        f.data = b"testdata"
    437        assert repr(f).endswith("promised_stream_id=4, data=<hex:7465737464617461>")
    438 
    439    def test_push_promise_frame_flags(self):
    440        f = PushPromiseFrame(1)
    441        flags = f.parse_flags(0xFF)
    442 
    443        assert flags == set(['END_HEADERS', 'PADDED'])
    444 
    445    def test_push_promise_frame_serializes_properly(self):
    446        f = PushPromiseFrame(1)
    447        f.flags = set(['END_HEADERS'])
    448        f.promised_stream_id = 4
    449        f.data = b'hello world'
    450 
    451        s = f.serialize()
    452        assert s == (
    453            b'\x00\x00\x0F\x05\x04\x00\x00\x00\x01' +
    454            b'\x00\x00\x00\x04' +
    455            b'hello world'
    456        )
    457 
    458    def test_push_promise_frame_parses_properly(self):
    459        s = (
    460            b'\x00\x00\x0F\x05\x04\x00\x00\x00\x01' +
    461            b'\x00\x00\x00\x04' +
    462            b'hello world'
    463        )
    464        f = decode_frame(s)
    465 
    466        assert isinstance(f, PushPromiseFrame)
    467        assert f.flags == set(['END_HEADERS'])
    468        assert f.promised_stream_id == 4
    469        assert f.data == b'hello world'
    470        assert f.body_len == 15
    471 
    472    def test_push_promise_frame_with_padding(self):
    473        s = (
    474            b'\x00\x00\x17\x05\x0C\x00\x00\x00\x01' +
    475            b'\x07\x00\x00\x00\x04' +
    476            b'hello world' +
    477            b'padding'
    478        )
    479        f = decode_frame(s)
    480 
    481        assert isinstance(f, PushPromiseFrame)
    482        assert f.flags == set(['END_HEADERS', 'PADDED'])
    483        assert f.promised_stream_id == 4
    484        assert f.data == b'hello world'
    485        assert f.body_len == 23
    486 
    487    def test_push_promise_frame_with_invalid_padding_fails_to_parse(self):
    488        # This frame has a padding length of 6 bytes, but a total length of
    489        # only 5.
    490        data = b'\x00\x00\x05\x05\x08\x00\x00\x00\x01\x06\x54\x65\x73\x74'
    491 
    492        with pytest.raises(InvalidPaddingError):
    493            decode_frame(data)
    494 
    495    def test_push_promise_frame_with_no_length_parses(self):
    496        # Fixes issue with empty data frames raising InvalidPaddingError.
    497        f = PushPromiseFrame(1, 2)
    498        f.data = b''
    499        data = f.serialize()
    500 
    501        new_frame = decode_frame(data)
    502        assert new_frame.data == b''
    503 
    504    def test_push_promise_frame_invalid(self):
    505        data = PushPromiseFrame(1, 0).serialize()
    506        with pytest.raises(InvalidDataError):
    507            decode_frame(data)
    508 
    509        data = PushPromiseFrame(1, 3).serialize()
    510        with pytest.raises(InvalidDataError):
    511            decode_frame(data)
    512 
    513    def test_short_push_promise_errors(self):
    514        s = (
    515            b'\x00\x00\x0F\x05\x04\x00\x00\x00\x01' +
    516            b'\x00\x00\x00'  # One byte short
    517        )
    518 
    519        with pytest.raises(InvalidFrameError):
    520            decode_frame(s)
    521 
    522 
    523 class TestPingFrame:
    524    def test_repr(self):
    525        f = PingFrame()
    526        assert repr(f).endswith("opaque_data=b''")
    527        f.opaque_data = b'hello'
    528        assert repr(f).endswith("opaque_data=b'hello'")
    529 
    530    def test_ping_frame_has_only_one_flag(self):
    531        f = PingFrame()
    532        flags = f.parse_flags(0xFF)
    533 
    534        assert flags == set(['ACK'])
    535 
    536    def test_ping_frame_serializes_properly(self):
    537        f = PingFrame()
    538        f.parse_flags(0xFF)
    539        f.opaque_data = b'\x01\x02'
    540 
    541        s = f.serialize()
    542        assert s == (
    543            b'\x00\x00\x08\x06\x01\x00\x00\x00\x00\x01\x02\x00\x00\x00\x00\x00'
    544            b'\x00'
    545        )
    546 
    547    def test_no_more_than_8_octets(self):
    548        f = PingFrame()
    549        f.opaque_data = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09'
    550 
    551        with pytest.raises(InvalidFrameError):
    552            f.serialize()
    553 
    554    def test_ping_frame_parses_properly(self):
    555        s = (
    556            b'\x00\x00\x08\x06\x01\x00\x00\x00\x00\x01\x02\x00\x00\x00\x00\x00'
    557            b'\x00'
    558        )
    559        f = decode_frame(s)
    560 
    561        assert isinstance(f, PingFrame)
    562        assert f.flags == set(['ACK'])
    563        assert f.opaque_data == b'\x01\x02\x00\x00\x00\x00\x00\x00'
    564        assert f.body_len == 8
    565 
    566    def test_ping_frame_never_has_a_stream(self):
    567        with pytest.raises(InvalidDataError):
    568            PingFrame(1)
    569 
    570    def test_ping_frame_has_no_more_than_body_length_8(self):
    571        f = PingFrame()
    572        with pytest.raises(InvalidFrameError):
    573            f.parse_body(b'\x01\x02\x03\x04\x05\x06\x07\x08\x09')
    574 
    575    def test_ping_frame_has_no_less_than_body_length_8(self):
    576        f = PingFrame()
    577        with pytest.raises(InvalidFrameError):
    578            f.parse_body(b'\x01\x02\x03\x04\x05\x06\x07')
    579 
    580 
    581 class TestGoAwayFrame:
    582    def test_repr(self):
    583        f = GoAwayFrame()
    584        assert repr(f).endswith("last_stream_id=0, error_code=0, additional_data=b''")
    585        f.last_stream_id = 64
    586        f.error_code = 32
    587        f.additional_data = b'hello'
    588        assert repr(f).endswith("last_stream_id=64, error_code=32, additional_data=b'hello'")
    589 
    590    def test_go_away_has_no_flags(self):
    591        f = GoAwayFrame()
    592        flags = f.parse_flags(0xFF)
    593 
    594        assert not flags
    595        assert isinstance(flags, Flags)
    596 
    597    def test_goaway_serializes_properly(self):
    598        f = GoAwayFrame()
    599        f.last_stream_id = 64
    600        f.error_code = 32
    601        f.additional_data = b'hello'
    602 
    603        s = f.serialize()
    604        assert s == (
    605            b'\x00\x00\x0D\x07\x00\x00\x00\x00\x00' +  # Frame header
    606            b'\x00\x00\x00\x40' +                      # Last Stream ID
    607            b'\x00\x00\x00\x20' +                      # Error Code
    608            b'hello'                                   # Additional data
    609        )
    610 
    611    def test_goaway_frame_parses_properly(self):
    612        s = (
    613            b'\x00\x00\x0D\x07\x00\x00\x00\x00\x00' +  # Frame header
    614            b'\x00\x00\x00\x40' +                      # Last Stream ID
    615            b'\x00\x00\x00\x20' +                      # Error Code
    616            b'hello'                                   # Additional data
    617        )
    618        f = decode_frame(s)
    619 
    620        assert isinstance(f, GoAwayFrame)
    621        assert f.flags == set()
    622        assert f.additional_data == b'hello'
    623        assert f.body_len == 13
    624 
    625        s = (
    626            b'\x00\x00\x08\x07\x00\x00\x00\x00\x00' +  # Frame header
    627            b'\x00\x00\x00\x40' +                      # Last Stream ID
    628            b'\x00\x00\x00\x20' +                      # Error Code
    629            b''                                        # Additional data
    630        )
    631        f = decode_frame(s)
    632 
    633        assert isinstance(f, GoAwayFrame)
    634        assert f.flags == set()
    635        assert f.additional_data == b''
    636        assert f.body_len == 8
    637 
    638    def test_goaway_frame_never_has_a_stream(self):
    639        with pytest.raises(InvalidDataError):
    640            GoAwayFrame(1)
    641 
    642    def test_short_goaway_frame_errors(self):
    643        s = (
    644            b'\x00\x00\x0D\x07\x00\x00\x00\x00\x00' +  # Frame header
    645            b'\x00\x00\x00\x40' +                      # Last Stream ID
    646            b'\x00\x00\x00'                            # short Error Code
    647        )
    648        with pytest.raises(InvalidFrameError):
    649            decode_frame(s)
    650 
    651 
    652 class TestWindowUpdateFrame:
    653    def test_repr(self):
    654        f = WindowUpdateFrame(0)
    655        assert repr(f).endswith("window_increment=0")
    656        f.stream_id = 1
    657        f.window_increment = 512
    658        assert repr(f).endswith("window_increment=512")
    659 
    660    def test_window_update_has_no_flags(self):
    661        f = WindowUpdateFrame(0)
    662        flags = f.parse_flags(0xFF)
    663 
    664        assert not flags
    665        assert isinstance(flags, Flags)
    666 
    667    def test_window_update_serializes_properly(self):
    668        f = WindowUpdateFrame(0)
    669        f.window_increment = 512
    670 
    671        s = f.serialize()
    672        assert s == b'\x00\x00\x04\x08\x00\x00\x00\x00\x00\x00\x00\x02\x00'
    673 
    674    def test_windowupdate_frame_parses_properly(self):
    675        s = b'\x00\x00\x04\x08\x00\x00\x00\x00\x00\x00\x00\x02\x00'
    676        f = decode_frame(s)
    677 
    678        assert isinstance(f, WindowUpdateFrame)
    679        assert f.flags == set()
    680        assert f.window_increment == 512
    681        assert f.body_len == 4
    682 
    683    def test_short_windowupdate_frame_errors(self):
    684        s = b'\x00\x00\x04\x08\x00\x00\x00\x00\x00\x00\x00\x02'  # -1 byte
    685        with pytest.raises(InvalidFrameError):
    686            decode_frame(s)
    687 
    688        s = b'\x00\x00\x05\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02'
    689        with pytest.raises(InvalidFrameError):
    690            decode_frame(s)
    691 
    692        with pytest.raises(InvalidDataError):
    693            decode_frame(WindowUpdateFrame(0).serialize())
    694 
    695        with pytest.raises(InvalidDataError):
    696            decode_frame(WindowUpdateFrame(2**31).serialize())
    697 
    698 
    699 class TestHeadersFrame:
    700    def test_repr(self):
    701        f = HeadersFrame(1)
    702        assert repr(f).endswith("exclusive=False, depends_on=0, stream_weight=0, data=None")
    703        f.data = b'hello'
    704        f.exclusive = True
    705        f.depends_on = 42
    706        f.stream_weight = 64
    707        assert repr(f).endswith("exclusive=True, depends_on=42, stream_weight=64, data=<hex:68656c6c6f>")
    708 
    709    def test_headers_frame_flags(self):
    710        f = HeadersFrame(1)
    711        flags = f.parse_flags(0xFF)
    712 
    713        assert flags == set(['END_STREAM', 'END_HEADERS',
    714                             'PADDED', 'PRIORITY'])
    715 
    716    def test_headers_frame_serializes_properly(self):
    717        f = HeadersFrame(1)
    718        f.flags = set(['END_STREAM', 'END_HEADERS'])
    719        f.data = b'hello world'
    720 
    721        s = f.serialize()
    722        assert s == (
    723            b'\x00\x00\x0B\x01\x05\x00\x00\x00\x01' +
    724            b'hello world'
    725        )
    726 
    727    def test_headers_frame_parses_properly(self):
    728        s = (
    729            b'\x00\x00\x0B\x01\x05\x00\x00\x00\x01' +
    730            b'hello world'
    731        )
    732        f = decode_frame(s)
    733 
    734        assert isinstance(f, HeadersFrame)
    735        assert f.flags == set(['END_STREAM', 'END_HEADERS'])
    736        assert f.data == b'hello world'
    737        assert f.body_len == 11
    738 
    739    def test_headers_frame_with_priority_parses_properly(self):
    740        # This test also tests that we can receive a HEADERS frame with no
    741        # actual headers on it. This is technically possible.
    742        s = (
    743            b'\x00\x00\x05\x01\x20\x00\x00\x00\x01' +
    744            b'\x80\x00\x00\x04\x40'
    745        )
    746        f = decode_frame(s)
    747 
    748        assert isinstance(f, HeadersFrame)
    749        assert f.flags == set(['PRIORITY'])
    750        assert f.data == b''
    751        assert f.depends_on == 4
    752        assert f.stream_weight == 64
    753        assert f.exclusive is True
    754        assert f.body_len == 5
    755 
    756    def test_headers_frame_with_priority_serializes_properly(self):
    757        # This test also tests that we can receive a HEADERS frame with no
    758        # actual headers on it. This is technically possible.
    759        s = (
    760            b'\x00\x00\x05\x01\x20\x00\x00\x00\x01' +
    761            b'\x80\x00\x00\x04\x40'
    762        )
    763        f = HeadersFrame(1)
    764        f.flags = set(['PRIORITY'])
    765        f.data = b''
    766        f.depends_on = 4
    767        f.stream_weight = 64
    768        f.exclusive = True
    769 
    770        assert f.serialize() == s
    771 
    772    def test_headers_frame_with_invalid_padding_fails_to_parse(self):
    773        # This frame has a padding length of 6 bytes, but a total length of
    774        # only 5.
    775        data = b'\x00\x00\x05\x01\x08\x00\x00\x00\x01\x06\x54\x65\x73\x74'
    776 
    777        with pytest.raises(InvalidPaddingError):
    778            decode_frame(data)
    779 
    780    def test_headers_frame_with_no_length_parses(self):
    781        # Fixes issue with empty data frames raising InvalidPaddingError.
    782        f = HeadersFrame(1)
    783        f.data = b''
    784        data = f.serialize()
    785 
    786        new_frame = decode_frame(data)
    787        assert new_frame.data == b''
    788 
    789 
    790 class TestContinuationFrame:
    791    def test_repr(self):
    792        f = ContinuationFrame(1)
    793        assert repr(f).endswith("data=None")
    794        f.data = b'hello'
    795        assert repr(f).endswith("data=<hex:68656c6c6f>")
    796 
    797    def test_continuation_frame_flags(self):
    798        f = ContinuationFrame(1)
    799        flags = f.parse_flags(0xFF)
    800 
    801        assert flags == set(['END_HEADERS'])
    802 
    803    def test_continuation_frame_serializes(self):
    804        f = ContinuationFrame(1)
    805        f.parse_flags(0x04)
    806        f.data = b'hello world'
    807 
    808        s = f.serialize()
    809        assert s == (
    810            b'\x00\x00\x0B\x09\x04\x00\x00\x00\x01' +
    811            b'hello world'
    812        )
    813 
    814    def test_continuation_frame_parses_properly(self):
    815        s = b'\x00\x00\x0B\x09\x04\x00\x00\x00\x01hello world'
    816        f = decode_frame(s)
    817 
    818        assert isinstance(f, ContinuationFrame)
    819        assert f.flags == set(['END_HEADERS'])
    820        assert f.data == b'hello world'
    821        assert f.body_len == 11
    822 
    823 
    824 class TestAltSvcFrame:
    825    payload_with_origin = (
    826        b'\x00\x00\x31'  # Length
    827        b'\x0A'  # Type
    828        b'\x00'  # Flags
    829        b'\x00\x00\x00\x00'  # Stream ID
    830        b'\x00\x0B'  # Origin len
    831        b'example.com'  # Origin
    832        b'h2="alt.example.com:8000", h2=":443"'  # Field Value
    833    )
    834    payload_without_origin = (
    835        b'\x00\x00\x13'  # Length
    836        b'\x0A'  # Type
    837        b'\x00'  # Flags
    838        b'\x00\x00\x00\x01'  # Stream ID
    839        b'\x00\x00'  # Origin len
    840        b''  # Origin
    841        b'h2=":8000"; ma=60'  # Field Value
    842    )
    843    payload_with_origin_and_stream = (
    844        b'\x00\x00\x36'  # Length
    845        b'\x0A'  # Type
    846        b'\x00'  # Flags
    847        b'\x00\x00\x00\x01'  # Stream ID
    848        b'\x00\x0B'  # Origin len
    849        b'example.com'  # Origin
    850        b'Alt-Svc: h2=":443"; ma=2592000; persist=1'  # Field Value
    851    )
    852 
    853    def test_repr(self):
    854        f = AltSvcFrame(0)
    855        assert repr(f).endswith("origin=b'', field=b''")
    856        f.field = b'h2="alt.example.com:8000", h2=":443"'
    857        assert repr(f).endswith("origin=b'', field=b'h2=\"alt.example.com:8000\", h2=\":443\"'")
    858        f.origin = b'example.com'
    859        assert repr(f).endswith("origin=b'example.com', field=b'h2=\"alt.example.com:8000\", h2=\":443\"'")
    860 
    861    def test_altsvc_frame_flags(self):
    862        f = AltSvcFrame(0)
    863        flags = f.parse_flags(0xFF)
    864 
    865        assert flags == set()
    866 
    867    def test_altsvc_frame_with_origin_serializes_properly(self):
    868        f = AltSvcFrame(0)
    869        f.origin = b'example.com'
    870        f.field = b'h2="alt.example.com:8000", h2=":443"'
    871 
    872        s = f.serialize()
    873        assert s == self.payload_with_origin
    874 
    875    def test_altsvc_frame_with_origin_parses_properly(self):
    876        f = decode_frame(self.payload_with_origin)
    877 
    878        assert isinstance(f, AltSvcFrame)
    879        assert f.origin == b'example.com'
    880        assert f.field == b'h2="alt.example.com:8000", h2=":443"'
    881        assert f.body_len == 49
    882        assert f.stream_id == 0
    883 
    884    def test_altsvc_frame_without_origin_serializes_properly(self):
    885        f = AltSvcFrame(1, origin=b'', field=b'h2=":8000"; ma=60')
    886        s = f.serialize()
    887        assert s == self.payload_without_origin
    888 
    889    def test_altsvc_frame_without_origin_parses_properly(self):
    890        f = decode_frame(self.payload_without_origin)
    891 
    892        assert isinstance(f, AltSvcFrame)
    893        assert f.origin == b''
    894        assert f.field == b'h2=":8000"; ma=60'
    895        assert f.body_len == 19
    896        assert f.stream_id == 1
    897 
    898    def test_altsvc_frame_with_origin_and_stream_serializes_properly(self):
    899        # This frame is not valid, but we allow it to be serialized anyway.
    900        f = AltSvcFrame(1)
    901        f.origin = b'example.com'
    902        f.field = b'Alt-Svc: h2=":443"; ma=2592000; persist=1'
    903 
    904        assert f.serialize() == self.payload_with_origin_and_stream
    905 
    906    def test_short_altsvc_frame_errors(self):
    907        with pytest.raises(InvalidFrameError):
    908            decode_frame(self.payload_with_origin[:12])
    909 
    910        with pytest.raises(InvalidFrameError):
    911            decode_frame(self.payload_with_origin[:10])
    912 
    913    def test_altsvc_with_unicode_origin_fails(self):
    914        with pytest.raises(InvalidDataError):
    915            AltSvcFrame(
    916                stream_id=0, origin=u'hello', field=b'h2=":8000"; ma=60'
    917 
    918            )
    919 
    920    def test_altsvc_with_unicode_field_fails(self):
    921        with pytest.raises(InvalidDataError):
    922            AltSvcFrame(
    923                stream_id=0, origin=b'hello', field=u'h2=":8000"; ma=60'
    924            )
    925 
    926 
    927 class TestExtensionFrame:
    928    def test_repr(self):
    929        f = ExtensionFrame(0xFF, 1, 42, b'hello')
    930        assert repr(f).endswith("type=255, flag_byte=42, body=<hex:68656c6c6f>")