tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

stream.py (35191B)


      1 # Copyright 2011, Google Inc.
      2 # All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 """This file provides classes and helper functions for parsing/building frames
     30 of the WebSocket protocol (RFC 6455).
     31 
     32 Specification:
     33 http://tools.ietf.org/html/rfc6455
     34 """
     35 
     36 from collections import deque
     37 import logging
     38 import os
     39 import struct
     40 import time
     41 import socket
     42 import six
     43 
     44 from mod_pywebsocket import common
     45 from mod_pywebsocket import util
     46 from mod_pywebsocket._stream_exceptions import BadOperationException
     47 from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException
     48 from mod_pywebsocket._stream_exceptions import InvalidFrameException
     49 from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception
     50 from mod_pywebsocket._stream_exceptions import UnsupportedFrameException
     51 
     52 _NOOP_MASKER = util.NoopMasker()
     53 
     54 
     55 class Frame(object):
     56    def __init__(self,
     57                 fin=1,
     58                 rsv1=0,
     59                 rsv2=0,
     60                 rsv3=0,
     61                 opcode=None,
     62                 payload=b''):
     63        self.fin = fin
     64        self.rsv1 = rsv1
     65        self.rsv2 = rsv2
     66        self.rsv3 = rsv3
     67        self.opcode = opcode
     68        self.payload = payload
     69 
     70 
     71 # Helper functions made public to be used for writing unittests for WebSocket
     72 # clients.
     73 
     74 
     75 def create_length_header(length, mask):
     76    """Creates a length header.
     77 
     78    Args:
     79        length: Frame length. Must be less than 2^63.
     80        mask: Mask bit. Must be boolean.
     81 
     82    Raises:
     83        ValueError: when bad data is given.
     84    """
     85 
     86    if mask:
     87        mask_bit = 1 << 7
     88    else:
     89        mask_bit = 0
     90 
     91    if length < 0:
     92        raise ValueError('length must be non negative integer')
     93    elif length <= 125:
     94        return util.pack_byte(mask_bit | length)
     95    elif length < (1 << 16):
     96        return util.pack_byte(mask_bit | 126) + struct.pack('!H', length)
     97    elif length < (1 << 63):
     98        return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length)
     99    else:
    100        raise ValueError('Payload is too big for one frame')
    101 
    102 
    103 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
    104    """Creates a frame header.
    105 
    106    Raises:
    107        Exception: when bad data is given.
    108    """
    109 
    110    if opcode < 0 or 0xf < opcode:
    111        raise ValueError('Opcode out of range')
    112 
    113    if payload_length < 0 or (1 << 63) <= payload_length:
    114        raise ValueError('payload_length out of range')
    115 
    116    if (fin | rsv1 | rsv2 | rsv3) & ~1:
    117        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
    118 
    119    header = b''
    120 
    121    first_byte = ((fin << 7)
    122                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
    123                  | opcode)
    124    header += util.pack_byte(first_byte)
    125    header += create_length_header(payload_length, mask)
    126 
    127    return header
    128 
    129 
    130 def _build_frame(header, body, mask):
    131    if not mask:
    132        return header + body
    133 
    134    masking_nonce = os.urandom(4)
    135    masker = util.RepeatedXorMasker(masking_nonce)
    136 
    137    return header + masking_nonce + masker.mask(body)
    138 
    139 
    140 def _filter_and_format_frame_object(frame, mask, frame_filters):
    141    for frame_filter in frame_filters:
    142        frame_filter.filter(frame)
    143 
    144    header = create_header(frame.opcode, len(frame.payload), frame.fin,
    145                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
    146    return _build_frame(header, frame.payload, mask)
    147 
    148 
    149 def create_binary_frame(message,
    150                        opcode=common.OPCODE_BINARY,
    151                        fin=1,
    152                        mask=False,
    153                        frame_filters=[]):
    154    """Creates a simple binary frame with no extension, reserved bit."""
    155 
    156    frame = Frame(fin=fin, opcode=opcode, payload=message)
    157    return _filter_and_format_frame_object(frame, mask, frame_filters)
    158 
    159 
    160 def create_text_frame(message,
    161                      opcode=common.OPCODE_TEXT,
    162                      fin=1,
    163                      mask=False,
    164                      frame_filters=[]):
    165    """Creates a simple text frame with no extension, reserved bit."""
    166 
    167    encoded_message = message.encode('utf-8')
    168    return create_binary_frame(encoded_message, opcode, fin, mask,
    169                               frame_filters)
    170 
    171 
    172 def parse_frame(receive_bytes,
    173                logger=None,
    174                ws_version=common.VERSION_HYBI_LATEST,
    175                unmask_receive=True):
    176    """Parses a frame. Returns a tuple containing each header field and
    177    payload.
    178 
    179    Args:
    180        receive_bytes: a function that reads frame data from a stream or
    181            something similar. The function takes length of the bytes to be
    182            read. The function must raise ConnectionTerminatedException if
    183            there is not enough data to be read.
    184        logger: a logging object.
    185        ws_version: the version of WebSocket protocol.
    186        unmask_receive: unmask received frames. When received unmasked
    187            frame, raises InvalidFrameException.
    188 
    189    Raises:
    190        ConnectionTerminatedException: when receive_bytes raises it.
    191        InvalidFrameException: when the frame contains invalid data.
    192    """
    193 
    194    if not logger:
    195        logger = logging.getLogger()
    196 
    197    logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
    198 
    199    first_byte = ord(receive_bytes(1))
    200    fin = (first_byte >> 7) & 1
    201    rsv1 = (first_byte >> 6) & 1
    202    rsv2 = (first_byte >> 5) & 1
    203    rsv3 = (first_byte >> 4) & 1
    204    opcode = first_byte & 0xf
    205 
    206    second_byte = ord(receive_bytes(1))
    207    mask = (second_byte >> 7) & 1
    208    payload_length = second_byte & 0x7f
    209 
    210    logger.log(
    211        common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
    212        'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask,
    213        payload_length)
    214 
    215    if (mask == 1) != unmask_receive:
    216        raise InvalidFrameException(
    217            'Mask bit on the received frame did\'nt match masking '
    218            'configuration for received frames')
    219 
    220    # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
    221    # into the 8-octet extended payload length field (or 0x0-0xFD in
    222    # 2-octet field).
    223    valid_length_encoding = True
    224    length_encoding_bytes = 1
    225    if payload_length == 127:
    226        logger.log(common.LOGLEVEL_FINE,
    227                   'Receive 8-octet extended payload length')
    228 
    229        extended_payload_length = receive_bytes(8)
    230        payload_length = struct.unpack('!Q', extended_payload_length)[0]
    231        if payload_length > 0x7FFFFFFFFFFFFFFF:
    232            raise InvalidFrameException('Extended payload length >= 2^63')
    233        if ws_version >= 13 and payload_length < 0x10000:
    234            valid_length_encoding = False
    235            length_encoding_bytes = 8
    236 
    237        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
    238                   payload_length)
    239    elif payload_length == 126:
    240        logger.log(common.LOGLEVEL_FINE,
    241                   'Receive 2-octet extended payload length')
    242 
    243        extended_payload_length = receive_bytes(2)
    244        payload_length = struct.unpack('!H', extended_payload_length)[0]
    245        if ws_version >= 13 and payload_length < 126:
    246            valid_length_encoding = False
    247            length_encoding_bytes = 2
    248 
    249        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
    250                   payload_length)
    251 
    252    if not valid_length_encoding:
    253        logger.warning(
    254            'Payload length is not encoded using the minimal number of '
    255            'bytes (%d is encoded using %d bytes)', payload_length,
    256            length_encoding_bytes)
    257 
    258    if mask == 1:
    259        logger.log(common.LOGLEVEL_FINE, 'Receive mask')
    260 
    261        masking_nonce = receive_bytes(4)
    262        masker = util.RepeatedXorMasker(masking_nonce)
    263 
    264        logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
    265    else:
    266        masker = _NOOP_MASKER
    267 
    268    logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
    269    if logger.isEnabledFor(common.LOGLEVEL_FINE):
    270        receive_start = time.time()
    271 
    272    raw_payload_bytes = receive_bytes(payload_length)
    273 
    274    if logger.isEnabledFor(common.LOGLEVEL_FINE):
    275        logger.log(
    276            common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s',
    277            payload_length / (time.time() - receive_start) / 1000 / 1000)
    278    logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
    279 
    280    if logger.isEnabledFor(common.LOGLEVEL_FINE):
    281        unmask_start = time.time()
    282 
    283    unmasked_bytes = masker.mask(raw_payload_bytes)
    284 
    285    if logger.isEnabledFor(common.LOGLEVEL_FINE):
    286        logger.log(common.LOGLEVEL_FINE,
    287                   'Done unmasking payload data at %s MB/s',
    288                   payload_length / (time.time() - unmask_start) / 1000 / 1000)
    289 
    290    return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
    291 
    292 
    293 class FragmentedFrameBuilder(object):
    294    """A stateful class to send a message as fragments."""
    295    def __init__(self, mask, frame_filters=[], encode_utf8=True):
    296        """Constructs an instance."""
    297 
    298        self._mask = mask
    299        self._frame_filters = frame_filters
    300        # This is for skipping UTF-8 encoding when building text type frames
    301        # from compressed data.
    302        self._encode_utf8 = encode_utf8
    303 
    304        self._started = False
    305 
    306        # Hold opcode of the first frame in messages to verify types of other
    307        # frames in the message are all the same.
    308        self._opcode = common.OPCODE_TEXT
    309 
    310    def build(self, payload_data, end, binary):
    311        if binary:
    312            frame_type = common.OPCODE_BINARY
    313        else:
    314            frame_type = common.OPCODE_TEXT
    315        if self._started:
    316            if self._opcode != frame_type:
    317                raise ValueError('Message types are different in frames for '
    318                                 'the same message')
    319            opcode = common.OPCODE_CONTINUATION
    320        else:
    321            opcode = frame_type
    322            self._opcode = frame_type
    323 
    324        if end:
    325            self._started = False
    326            fin = 1
    327        else:
    328            self._started = True
    329            fin = 0
    330 
    331        if binary or not self._encode_utf8:
    332            return create_binary_frame(payload_data, opcode, fin, self._mask,
    333                                       self._frame_filters)
    334        else:
    335            return create_text_frame(payload_data, opcode, fin, self._mask,
    336                                     self._frame_filters)
    337 
    338 
    339 def _create_control_frame(opcode, body, mask, frame_filters):
    340    frame = Frame(opcode=opcode, payload=body)
    341 
    342    for frame_filter in frame_filters:
    343        frame_filter.filter(frame)
    344 
    345    if len(frame.payload) > 125:
    346        raise BadOperationException(
    347            'Payload data size of control frames must be 125 bytes or less')
    348 
    349    header = create_header(frame.opcode, len(frame.payload), frame.fin,
    350                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
    351    return _build_frame(header, frame.payload, mask)
    352 
    353 
    354 def create_ping_frame(body, mask=False, frame_filters=[]):
    355    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
    356 
    357 
    358 def create_pong_frame(body, mask=False, frame_filters=[]):
    359    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
    360 
    361 
    362 def create_close_frame(body, mask=False, frame_filters=[]):
    363    return _create_control_frame(common.OPCODE_CLOSE, body, mask,
    364                                 frame_filters)
    365 
    366 
    367 def create_closing_handshake_body(code, reason):
    368    body = b''
    369    if code is not None:
    370        if (code > common.STATUS_USER_PRIVATE_MAX
    371                or code < common.STATUS_NORMAL_CLOSURE):
    372            raise BadOperationException('Status code is out of range')
    373        if (code == common.STATUS_NO_STATUS_RECEIVED
    374                or code == common.STATUS_ABNORMAL_CLOSURE
    375                or code == common.STATUS_TLS_HANDSHAKE):
    376            raise BadOperationException('Status code is reserved pseudo '
    377                                        'code')
    378        encoded_reason = reason.encode('utf-8')
    379        body = struct.pack('!H', code) + encoded_reason
    380    return body
    381 
    382 
    383 class StreamOptions(object):
    384    """Holds option values to configure Stream objects."""
    385    def __init__(self):
    386        """Constructs StreamOptions."""
    387 
    388        # Filters applied to frames.
    389        self.outgoing_frame_filters = []
    390        self.incoming_frame_filters = []
    391 
    392        # Filters applied to messages. Control frames are not affected by them.
    393        self.outgoing_message_filters = []
    394        self.incoming_message_filters = []
    395 
    396        self.encode_text_message_to_utf8 = True
    397        self.mask_send = False
    398        self.unmask_receive = True
    399 
    400 
    401 class Stream(object):
    402    """A class for parsing/building frames of the WebSocket protocol
    403    (RFC 6455).
    404    """
    405    def __init__(self, request, options):
    406        """Constructs an instance.
    407 
    408        Args:
    409            request: mod_python request.
    410        """
    411 
    412        self._logger = util.get_class_logger(self)
    413 
    414        self._options = options
    415        self._request = request
    416 
    417        self._request.client_terminated = False
    418        self._request.server_terminated = False
    419 
    420        # Holds body of received fragments.
    421        self._received_fragments = []
    422        # Holds the opcode of the first fragment.
    423        self._original_opcode = None
    424 
    425        self._writer = FragmentedFrameBuilder(
    426            self._options.mask_send, self._options.outgoing_frame_filters,
    427            self._options.encode_text_message_to_utf8)
    428 
    429        self._ping_queue = deque()
    430 
    431    def _read(self, length):
    432        """Reads length bytes from connection. In case we catch any exception,
    433        prepends remote address to the exception message and raise again.
    434 
    435        Raises:
    436            ConnectionTerminatedException: when read returns empty string.
    437        """
    438 
    439        try:
    440            read_bytes = self._request.connection.read(length)
    441            if not read_bytes:
    442                raise ConnectionTerminatedException(
    443                    'Receiving %d byte failed. Peer (%r) closed connection' %
    444                    (length, (self._request.connection.remote_addr, )))
    445            return read_bytes
    446        except IOError as e:
    447            # Also catch an IOError because mod_python throws it.
    448            raise ConnectionTerminatedException(
    449                'Receiving %d byte failed. IOError (%s) occurred' %
    450                (length, e))
    451 
    452    def _write(self, bytes_to_write):
    453        """Writes given bytes to connection. In case we catch any exception,
    454        prepends remote address to the exception message and raise again.
    455        """
    456 
    457        try:
    458            self._request.connection.write(bytes_to_write)
    459        except Exception as e:
    460            util.prepend_message_to_exception(
    461                'Failed to send message to %r: ' %
    462                (self._request.connection.remote_addr, ), e)
    463            raise
    464 
    465    def receive_bytes(self, length):
    466        """Receives multiple bytes. Retries read when we couldn't receive the
    467        specified amount. This method returns byte strings.
    468 
    469        Raises:
    470            ConnectionTerminatedException: when read returns empty string.
    471        """
    472 
    473        read_bytes = []
    474        while length > 0:
    475            new_read_bytes = self._read(length)
    476            read_bytes.append(new_read_bytes)
    477            length -= len(new_read_bytes)
    478        return b''.join(read_bytes)
    479 
    480    def _read_until(self, delim_char):
    481        """Reads bytes until we encounter delim_char. The result will not
    482        contain delim_char.
    483 
    484        Raises:
    485            ConnectionTerminatedException: when read returns empty string.
    486        """
    487 
    488        read_bytes = []
    489        while True:
    490            ch = self._read(1)
    491            if ch == delim_char:
    492                break
    493            read_bytes.append(ch)
    494        return b''.join(read_bytes)
    495 
    496    def _receive_frame(self):
    497        """Receives a frame and return data in the frame as a tuple containing
    498        each header field and payload separately.
    499 
    500        Raises:
    501            ConnectionTerminatedException: when read returns empty
    502                string.
    503            InvalidFrameException: when the frame contains invalid data.
    504        """
    505        def _receive_bytes(length):
    506            return self.receive_bytes(length)
    507 
    508        return parse_frame(receive_bytes=_receive_bytes,
    509                           logger=self._logger,
    510                           ws_version=self._request.ws_version,
    511                           unmask_receive=self._options.unmask_receive)
    512 
    513    def _receive_frame_as_frame_object(self):
    514        opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
    515 
    516        return Frame(fin=fin,
    517                     rsv1=rsv1,
    518                     rsv2=rsv2,
    519                     rsv3=rsv3,
    520                     opcode=opcode,
    521                     payload=unmasked_bytes)
    522 
    523    def receive_filtered_frame(self):
    524        """Receives a frame and applies frame filters and message filters.
    525        The frame to be received must satisfy following conditions:
    526        - The frame is not fragmented.
    527        - The opcode of the frame is TEXT or BINARY.
    528 
    529        DO NOT USE this method except for testing purpose.
    530        """
    531 
    532        frame = self._receive_frame_as_frame_object()
    533        if not frame.fin:
    534            raise InvalidFrameException(
    535                'Segmented frames must not be received via '
    536                'receive_filtered_frame()')
    537        if (frame.opcode != common.OPCODE_TEXT
    538                and frame.opcode != common.OPCODE_BINARY):
    539            raise InvalidFrameException(
    540                'Control frames must not be received via '
    541                'receive_filtered_frame()')
    542 
    543        for frame_filter in self._options.incoming_frame_filters:
    544            frame_filter.filter(frame)
    545        for message_filter in self._options.incoming_message_filters:
    546            frame.payload = message_filter.filter(frame.payload)
    547        return frame
    548 
    549    def send_message(self, message, end=True, binary=False):
    550        """Send message.
    551 
    552        Args:
    553            message: text in unicode or binary in str to send.
    554            binary: send message as binary frame.
    555 
    556        Raises:
    557            BadOperationException: when called on a server-terminated
    558                connection or called with inconsistent message type or
    559                binary parameter.
    560        """
    561 
    562        if self._request.server_terminated:
    563            raise BadOperationException(
    564                'Requested send_message after sending out a closing handshake')
    565 
    566        if binary and isinstance(message, six.text_type):
    567            raise BadOperationException(
    568                'Message for binary frame must not be instance of Unicode')
    569 
    570        for message_filter in self._options.outgoing_message_filters:
    571            message = message_filter.filter(message, end, binary)
    572 
    573        try:
    574            # Set this to any positive integer to limit maximum size of data in
    575            # payload data of each frame.
    576            MAX_PAYLOAD_DATA_SIZE = -1
    577 
    578            if MAX_PAYLOAD_DATA_SIZE <= 0:
    579                self._write(self._writer.build(message, end, binary))
    580                return
    581 
    582            bytes_written = 0
    583            while True:
    584                end_for_this_frame = end
    585                bytes_to_write = len(message) - bytes_written
    586                if (MAX_PAYLOAD_DATA_SIZE > 0
    587                        and bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
    588                    end_for_this_frame = False
    589                    bytes_to_write = MAX_PAYLOAD_DATA_SIZE
    590 
    591                frame = self._writer.build(
    592                    message[bytes_written:bytes_written + bytes_to_write],
    593                    end_for_this_frame, binary)
    594                self._write(frame)
    595 
    596                bytes_written += bytes_to_write
    597 
    598                # This if must be placed here (the end of while block) so that
    599                # at least one frame is sent.
    600                if len(message) <= bytes_written:
    601                    break
    602        except ValueError as e:
    603            raise BadOperationException(e)
    604 
    605    def _get_message_from_frame(self, frame):
    606        """Gets a message from frame. If the message is composed of fragmented
    607        frames and the frame is not the last fragmented frame, this method
    608        returns None. The whole message will be returned when the last
    609        fragmented frame is passed to this method.
    610 
    611        Raises:
    612            InvalidFrameException: when the frame doesn't match defragmentation
    613                context, or the frame contains invalid data.
    614        """
    615 
    616        if frame.opcode == common.OPCODE_CONTINUATION:
    617            if not self._received_fragments:
    618                if frame.fin:
    619                    raise InvalidFrameException(
    620                        'Received a termination frame but fragmentation '
    621                        'not started')
    622                else:
    623                    raise InvalidFrameException(
    624                        'Received an intermediate frame but '
    625                        'fragmentation not started')
    626 
    627            if frame.fin:
    628                # End of fragmentation frame
    629                self._received_fragments.append(frame.payload)
    630                message = b''.join(self._received_fragments)
    631                self._received_fragments = []
    632                return message
    633            else:
    634                # Intermediate frame
    635                self._received_fragments.append(frame.payload)
    636                return None
    637        else:
    638            if self._received_fragments:
    639                if frame.fin:
    640                    raise InvalidFrameException(
    641                        'Received an unfragmented frame without '
    642                        'terminating existing fragmentation')
    643                else:
    644                    raise InvalidFrameException(
    645                        'New fragmentation started without terminating '
    646                        'existing fragmentation')
    647 
    648            if frame.fin:
    649                # Unfragmented frame
    650 
    651                self._original_opcode = frame.opcode
    652                return frame.payload
    653            else:
    654                # Start of fragmentation frame
    655 
    656                if common.is_control_opcode(frame.opcode):
    657                    raise InvalidFrameException(
    658                        'Control frames must not be fragmented')
    659 
    660                self._original_opcode = frame.opcode
    661                self._received_fragments.append(frame.payload)
    662                return None
    663 
    664    def _process_close_message(self, message):
    665        """Processes close message.
    666 
    667        Args:
    668            message: close message.
    669 
    670        Raises:
    671            InvalidFrameException: when the message is invalid.
    672        """
    673 
    674        self._request.client_terminated = True
    675 
    676        # Status code is optional. We can have status reason only if we
    677        # have status code. Status reason can be empty string. So,
    678        # allowed cases are
    679        # - no application data: no code no reason
    680        # - 2 octet of application data: has code but no reason
    681        # - 3 or more octet of application data: both code and reason
    682        if len(message) == 0:
    683            self._logger.debug('Received close frame (empty body)')
    684            self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED
    685        elif len(message) == 1:
    686            raise InvalidFrameException(
    687                'If a close frame has status code, the length of '
    688                'status code must be 2 octet')
    689        elif len(message) >= 2:
    690            self._request.ws_close_code = struct.unpack('!H', message[0:2])[0]
    691            self._request.ws_close_reason = message[2:].decode(
    692                'utf-8', 'replace')
    693            self._logger.debug('Received close frame (code=%d, reason=%r)',
    694                               self._request.ws_close_code,
    695                               self._request.ws_close_reason)
    696 
    697        # As we've received a close frame, no more data is coming over the
    698        # socket. We can now safely close the socket without worrying about
    699        # RST sending.
    700 
    701        if self._request.server_terminated:
    702            self._logger.debug(
    703                'Received ack for server-initiated closing handshake')
    704            return
    705 
    706        self._logger.debug('Received client-initiated closing handshake')
    707 
    708        code = common.STATUS_NORMAL_CLOSURE
    709        reason = ''
    710        if hasattr(self._request, '_dispatcher'):
    711            dispatcher = self._request._dispatcher
    712            code, reason = dispatcher.passive_closing_handshake(self._request)
    713            if code is None and reason is not None and len(reason) > 0:
    714                self._logger.warning(
    715                    'Handler specified reason despite code being None')
    716                reason = ''
    717            if reason is None:
    718                reason = ''
    719        self._send_closing_handshake(code, reason)
    720        self._logger.debug(
    721            'Acknowledged closing handshake initiated by the peer '
    722            '(code=%r, reason=%r)', code, reason)
    723 
    724    def _process_ping_message(self, message):
    725        """Processes ping message.
    726 
    727        Args:
    728            message: ping message.
    729        """
    730 
    731        try:
    732            handler = self._request.on_ping_handler
    733            if handler:
    734                handler(self._request, message)
    735                return
    736        except AttributeError:
    737            pass
    738        self._send_pong(message)
    739 
    740    def _process_pong_message(self, message):
    741        """Processes pong message.
    742 
    743        Args:
    744            message: pong message.
    745        """
    746 
    747        # TODO(tyoshino): Add ping timeout handling.
    748 
    749        inflight_pings = deque()
    750 
    751        while True:
    752            try:
    753                expected_body = self._ping_queue.popleft()
    754                if expected_body == message:
    755                    # inflight_pings contains pings ignored by the
    756                    # other peer. Just forget them.
    757                    self._logger.debug(
    758                        'Ping %r is acked (%d pings were ignored)',
    759                        expected_body, len(inflight_pings))
    760                    break
    761                else:
    762                    inflight_pings.append(expected_body)
    763            except IndexError:
    764                # The received pong was unsolicited pong. Keep the
    765                # ping queue as is.
    766                self._ping_queue = inflight_pings
    767                self._logger.debug('Received a unsolicited pong')
    768                break
    769 
    770        try:
    771            handler = self._request.on_pong_handler
    772            if handler:
    773                handler(self._request, message)
    774        except AttributeError:
    775            pass
    776 
    777    def receive_message(self):
    778        """Receive a WebSocket frame and return its payload as a text in
    779        unicode or a binary in str.
    780 
    781        Returns:
    782            payload data of the frame
    783            - as unicode instance if received text frame
    784            - as str instance if received binary frame
    785            or None iff received closing handshake.
    786        Raises:
    787            BadOperationException: when called on a client-terminated
    788                connection.
    789            ConnectionTerminatedException: when read returns empty
    790                string.
    791            InvalidFrameException: when the frame contains invalid
    792                data.
    793            UnsupportedFrameException: when the received frame has
    794                flags, opcode we cannot handle. You can ignore this
    795                exception and continue receiving the next frame.
    796        """
    797 
    798        if self._request.client_terminated:
    799            raise BadOperationException(
    800                'Requested receive_message after receiving a closing '
    801                'handshake')
    802 
    803        while True:
    804            # mp_conn.read will block if no bytes are available.
    805 
    806            frame = self._receive_frame_as_frame_object()
    807 
    808            # Check the constraint on the payload size for control frames
    809            # before extension processes the frame.
    810            # See also http://tools.ietf.org/html/rfc6455#section-5.5
    811            if (common.is_control_opcode(frame.opcode)
    812                    and len(frame.payload) > 125):
    813                raise InvalidFrameException(
    814                    'Payload data size of control frames must be 125 bytes or '
    815                    'less')
    816 
    817            for frame_filter in self._options.incoming_frame_filters:
    818                frame_filter.filter(frame)
    819 
    820            if frame.rsv1 or frame.rsv2 or frame.rsv3:
    821                raise UnsupportedFrameException(
    822                    'Unsupported flag is set (rsv = %d%d%d)' %
    823                    (frame.rsv1, frame.rsv2, frame.rsv3))
    824 
    825            message = self._get_message_from_frame(frame)
    826            if message is None:
    827                continue
    828 
    829            for message_filter in self._options.incoming_message_filters:
    830                message = message_filter.filter(message)
    831 
    832            if self._original_opcode == common.OPCODE_TEXT:
    833                # The WebSocket protocol section 4.4 specifies that invalid
    834                # characters must be replaced with U+fffd REPLACEMENT
    835                # CHARACTER.
    836                try:
    837                    return message.decode('utf-8')
    838                except UnicodeDecodeError as e:
    839                    raise InvalidUTF8Exception(e)
    840            elif self._original_opcode == common.OPCODE_BINARY:
    841                return message
    842            elif self._original_opcode == common.OPCODE_CLOSE:
    843                self._process_close_message(message)
    844                return None
    845            elif self._original_opcode == common.OPCODE_PING:
    846                self._process_ping_message(message)
    847            elif self._original_opcode == common.OPCODE_PONG:
    848                self._process_pong_message(message)
    849            else:
    850                raise UnsupportedFrameException('Opcode %d is not supported' %
    851                                                self._original_opcode)
    852 
    853    def _send_closing_handshake(self, code, reason):
    854        body = create_closing_handshake_body(code, reason)
    855        frame = create_close_frame(
    856            body,
    857            mask=self._options.mask_send,
    858            frame_filters=self._options.outgoing_frame_filters)
    859 
    860        self._request.server_terminated = True
    861 
    862        self._write(frame)
    863 
    864    def close_connection(self,
    865                         code=common.STATUS_NORMAL_CLOSURE,
    866                         reason='',
    867                         wait_response=True):
    868        """Closes a WebSocket connection. Note that this method blocks until
    869        it receives acknowledgement to the closing handshake.
    870 
    871        Args:
    872            code: Status code for close frame. If code is None, a close
    873                frame with empty body will be sent.
    874            reason: string representing close reason.
    875            wait_response: True when caller want to wait the response.
    876        Raises:
    877            BadOperationException: when reason is specified with code None
    878                or reason is not an instance of both str and unicode.
    879        """
    880 
    881        if self._request.server_terminated:
    882            self._logger.debug(
    883                'Requested close_connection but server is already terminated')
    884            return
    885 
    886        # When we receive a close frame, we call _process_close_message().
    887        # _process_close_message() immediately acknowledges to the
    888        # server-initiated closing handshake and sets server_terminated to
    889        # True. So, here we can assume that we haven't received any close
    890        # frame. We're initiating a closing handshake.
    891 
    892        if code is None:
    893            if reason is not None and len(reason) > 0:
    894                raise BadOperationException(
    895                    'close reason must not be specified if code is None')
    896            reason = ''
    897        else:
    898            if not isinstance(reason, bytes) and not isinstance(
    899                    reason, six.text_type):
    900                raise BadOperationException(
    901                    'close reason must be an instance of bytes or unicode')
    902 
    903        self._send_closing_handshake(code, reason)
    904        self._logger.debug('Initiated closing handshake (code=%r, reason=%r)',
    905                           code, reason)
    906 
    907        if (code == common.STATUS_GOING_AWAY
    908                or code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
    909            # It doesn't make sense to wait for a close frame if the reason is
    910            # protocol error or that the server is going away. For some of
    911            # other reasons, it might not make sense to wait for a close frame,
    912            # but it's not clear, yet.
    913            return
    914 
    915        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
    916        # or until a server-defined timeout expires.
    917        #
    918        # For now, we expect receiving closing handshake right after sending
    919        # out closing handshake.
    920        message = self.receive_message()
    921        if message is not None:
    922            raise ConnectionTerminatedException(
    923                'Didn\'t receive valid ack for closing handshake')
    924        # TODO: 3. close the WebSocket connection.
    925        # note: mod_python Connection (mp_conn) doesn't have close method.
    926 
    927    def send_ping(self, body, binary=False):
    928        if not binary and isinstance(body, six.text_type):
    929            body = body.encode('UTF-8')
    930        frame = create_ping_frame(body, self._options.mask_send,
    931                                  self._options.outgoing_frame_filters)
    932        self._write(frame)
    933 
    934        self._ping_queue.append(body)
    935 
    936    def _send_pong(self, body):
    937        frame = create_pong_frame(body, self._options.mask_send,
    938                                  self._options.outgoing_frame_filters)
    939        self._write(frame)
    940 
    941    def get_last_received_opcode(self):
    942        """Returns the opcode of the WebSocket message which the last received
    943        frame belongs to. The return value is valid iff immediately after
    944        receive_message call.
    945        """
    946 
    947        return self._original_opcode
    948 
    949 
    950 # vi:sts=4 sw=4 et