tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

asyncio-server.py (7261B)


      1 # -*- coding: utf-8 -*-
      2 """
      3 asyncio-server.py
      4 ~~~~~~~~~~~~~~~~~
      5 
      6 A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+.
      7 
      8 This example demonstrates handling requests with bodies, as well as handling
      9 those without. In particular, it demonstrates the fact that DataReceived may
     10 be called multiple times, and that applications must handle that possibility.
     11 """
     12 import asyncio
     13 import io
     14 import json
     15 import ssl
     16 import collections
     17 from typing import List, Tuple
     18 
     19 from h2.config import H2Configuration
     20 from h2.connection import H2Connection
     21 from h2.events import (
     22    ConnectionTerminated, DataReceived, RemoteSettingsChanged,
     23    RequestReceived, StreamEnded, StreamReset, WindowUpdated
     24 )
     25 from h2.errors import ErrorCodes
     26 from h2.exceptions import ProtocolError, StreamClosedError
     27 from h2.settings import SettingCodes
     28 
     29 
     30 RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
     31 
     32 
     33 class H2Protocol(asyncio.Protocol):
     34    def __init__(self):
     35        config = H2Configuration(client_side=False, header_encoding='utf-8')
     36        self.conn = H2Connection(config=config)
     37        self.transport = None
     38        self.stream_data = {}
     39        self.flow_control_futures = {}
     40 
     41    def connection_made(self, transport: asyncio.Transport):
     42        self.transport = transport
     43        self.conn.initiate_connection()
     44        self.transport.write(self.conn.data_to_send())
     45 
     46    def connection_lost(self, exc):
     47        for future in self.flow_control_futures.values():
     48            future.cancel()
     49        self.flow_control_futures = {}
     50 
     51    def data_received(self, data: bytes):
     52        try:
     53            events = self.conn.receive_data(data)
     54        except ProtocolError as e:
     55            self.transport.write(self.conn.data_to_send())
     56            self.transport.close()
     57        else:
     58            self.transport.write(self.conn.data_to_send())
     59            for event in events:
     60                if isinstance(event, RequestReceived):
     61                    self.request_received(event.headers, event.stream_id)
     62                elif isinstance(event, DataReceived):
     63                    self.receive_data(event.data, event.stream_id)
     64                elif isinstance(event, StreamEnded):
     65                    self.stream_complete(event.stream_id)
     66                elif isinstance(event, ConnectionTerminated):
     67                    self.transport.close()
     68                elif isinstance(event, StreamReset):
     69                    self.stream_reset(event.stream_id)
     70                elif isinstance(event, WindowUpdated):
     71                    self.window_updated(event.stream_id, event.delta)
     72                elif isinstance(event, RemoteSettingsChanged):
     73                    if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
     74                        self.window_updated(None, 0)
     75 
     76                self.transport.write(self.conn.data_to_send())
     77 
     78    def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
     79        headers = collections.OrderedDict(headers)
     80        method = headers[':method']
     81 
     82        # Store off the request data.
     83        request_data = RequestData(headers, io.BytesIO())
     84        self.stream_data[stream_id] = request_data
     85 
     86    def stream_complete(self, stream_id: int):
     87        """
     88        When a stream is complete, we can send our response.
     89        """
     90        try:
     91            request_data = self.stream_data[stream_id]
     92        except KeyError:
     93            # Just return, we probably 405'd this already
     94            return
     95 
     96        headers = request_data.headers
     97        body = request_data.data.getvalue().decode('utf-8')
     98 
     99        data = json.dumps(
    100            {"headers": headers, "body": body}, indent=4
    101        ).encode("utf8")
    102 
    103        response_headers = (
    104            (':status', '200'),
    105            ('content-type', 'application/json'),
    106            ('content-length', str(len(data))),
    107            ('server', 'asyncio-h2'),
    108        )
    109        self.conn.send_headers(stream_id, response_headers)
    110        asyncio.ensure_future(self.send_data(data, stream_id))
    111 
    112    def receive_data(self, data: bytes, stream_id: int):
    113        """
    114        We've received some data on a stream. If that stream is one we're
    115        expecting data on, save it off. Otherwise, reset the stream.
    116        """
    117        try:
    118            stream_data = self.stream_data[stream_id]
    119        except KeyError:
    120            self.conn.reset_stream(
    121                stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
    122            )
    123        else:
    124            stream_data.data.write(data)
    125 
    126    def stream_reset(self, stream_id):
    127        """
    128        A stream reset was sent. Stop sending data.
    129        """
    130        if stream_id in self.flow_control_futures:
    131            future = self.flow_control_futures.pop(stream_id)
    132            future.cancel()
    133 
    134    async def send_data(self, data, stream_id):
    135        """
    136        Send data according to the flow control rules.
    137        """
    138        while data:
    139            while self.conn.local_flow_control_window(stream_id) < 1:
    140                try:
    141                    await self.wait_for_flow_control(stream_id)
    142                except asyncio.CancelledError:
    143                    return
    144 
    145            chunk_size = min(
    146                self.conn.local_flow_control_window(stream_id),
    147                len(data),
    148                self.conn.max_outbound_frame_size,
    149            )
    150 
    151            try:
    152                self.conn.send_data(
    153                    stream_id,
    154                    data[:chunk_size],
    155                    end_stream=(chunk_size == len(data))
    156                )
    157            except (StreamClosedError, ProtocolError):
    158                # The stream got closed and we didn't get told. We're done
    159                # here.
    160                break
    161 
    162            self.transport.write(self.conn.data_to_send())
    163            data = data[chunk_size:]
    164 
    165    async def wait_for_flow_control(self, stream_id):
    166        """
    167        Waits for a Future that fires when the flow control window is opened.
    168        """
    169        f = asyncio.Future()
    170        self.flow_control_futures[stream_id] = f
    171        await f
    172 
    173    def window_updated(self, stream_id, delta):
    174        """
    175        A window update frame was received. Unblock some number of flow control
    176        Futures.
    177        """
    178        if stream_id and stream_id in self.flow_control_futures:
    179            f = self.flow_control_futures.pop(stream_id)
    180            f.set_result(delta)
    181        elif not stream_id:
    182            for f in self.flow_control_futures.values():
    183                f.set_result(delta)
    184 
    185            self.flow_control_futures = {}
    186 
    187 
    188 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    189 ssl_context.options |= (
    190    ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
    191 )
    192 ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
    193 ssl_context.set_alpn_protocols(["h2"])
    194 
    195 loop = asyncio.get_event_loop()
    196 # Each client connection will create a new protocol instance
    197 coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
    198 server = loop.run_until_complete(coro)
    199 
    200 # Serve requests until Ctrl+C is pressed
    201 print('Serving on {}'.format(server.sockets[0].getsockname()))
    202 try:
    203    loop.run_forever()
    204 except KeyboardInterrupt:
    205    pass
    206 
    207 # Close the server
    208 server.close()
    209 loop.run_until_complete(server.wait_closed())
    210 loop.close()