asyncio-server.py (7261B)
1 # -*- coding: utf-8 -*- 2 """ 3 asyncio-server.py 4 ~~~~~~~~~~~~~~~~~ 5 6 A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+. 7 8 This example demonstrates handling requests with bodies, as well as handling 9 those without. In particular, it demonstrates the fact that DataReceived may 10 be called multiple times, and that applications must handle that possibility. 11 """ 12 import asyncio 13 import io 14 import json 15 import ssl 16 import collections 17 from typing import List, Tuple 18 19 from h2.config import H2Configuration 20 from h2.connection import H2Connection 21 from h2.events import ( 22 ConnectionTerminated, DataReceived, RemoteSettingsChanged, 23 RequestReceived, StreamEnded, StreamReset, WindowUpdated 24 ) 25 from h2.errors import ErrorCodes 26 from h2.exceptions import ProtocolError, StreamClosedError 27 from h2.settings import SettingCodes 28 29 30 RequestData = collections.namedtuple('RequestData', ['headers', 'data']) 31 32 33 class H2Protocol(asyncio.Protocol): 34 def __init__(self): 35 config = H2Configuration(client_side=False, header_encoding='utf-8') 36 self.conn = H2Connection(config=config) 37 self.transport = None 38 self.stream_data = {} 39 self.flow_control_futures = {} 40 41 def connection_made(self, transport: asyncio.Transport): 42 self.transport = transport 43 self.conn.initiate_connection() 44 self.transport.write(self.conn.data_to_send()) 45 46 def connection_lost(self, exc): 47 for future in self.flow_control_futures.values(): 48 future.cancel() 49 self.flow_control_futures = {} 50 51 def data_received(self, data: bytes): 52 try: 53 events = self.conn.receive_data(data) 54 except ProtocolError as e: 55 self.transport.write(self.conn.data_to_send()) 56 self.transport.close() 57 else: 58 self.transport.write(self.conn.data_to_send()) 59 for event in events: 60 if isinstance(event, RequestReceived): 61 self.request_received(event.headers, event.stream_id) 62 elif isinstance(event, DataReceived): 63 self.receive_data(event.data, event.stream_id) 64 elif isinstance(event, StreamEnded): 65 self.stream_complete(event.stream_id) 66 elif isinstance(event, ConnectionTerminated): 67 self.transport.close() 68 elif isinstance(event, StreamReset): 69 self.stream_reset(event.stream_id) 70 elif isinstance(event, WindowUpdated): 71 self.window_updated(event.stream_id, event.delta) 72 elif isinstance(event, RemoteSettingsChanged): 73 if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings: 74 self.window_updated(None, 0) 75 76 self.transport.write(self.conn.data_to_send()) 77 78 def request_received(self, headers: List[Tuple[str, str]], stream_id: int): 79 headers = collections.OrderedDict(headers) 80 method = headers[':method'] 81 82 # Store off the request data. 83 request_data = RequestData(headers, io.BytesIO()) 84 self.stream_data[stream_id] = request_data 85 86 def stream_complete(self, stream_id: int): 87 """ 88 When a stream is complete, we can send our response. 89 """ 90 try: 91 request_data = self.stream_data[stream_id] 92 except KeyError: 93 # Just return, we probably 405'd this already 94 return 95 96 headers = request_data.headers 97 body = request_data.data.getvalue().decode('utf-8') 98 99 data = json.dumps( 100 {"headers": headers, "body": body}, indent=4 101 ).encode("utf8") 102 103 response_headers = ( 104 (':status', '200'), 105 ('content-type', 'application/json'), 106 ('content-length', str(len(data))), 107 ('server', 'asyncio-h2'), 108 ) 109 self.conn.send_headers(stream_id, response_headers) 110 asyncio.ensure_future(self.send_data(data, stream_id)) 111 112 def receive_data(self, data: bytes, stream_id: int): 113 """ 114 We've received some data on a stream. If that stream is one we're 115 expecting data on, save it off. Otherwise, reset the stream. 116 """ 117 try: 118 stream_data = self.stream_data[stream_id] 119 except KeyError: 120 self.conn.reset_stream( 121 stream_id, error_code=ErrorCodes.PROTOCOL_ERROR 122 ) 123 else: 124 stream_data.data.write(data) 125 126 def stream_reset(self, stream_id): 127 """ 128 A stream reset was sent. Stop sending data. 129 """ 130 if stream_id in self.flow_control_futures: 131 future = self.flow_control_futures.pop(stream_id) 132 future.cancel() 133 134 async def send_data(self, data, stream_id): 135 """ 136 Send data according to the flow control rules. 137 """ 138 while data: 139 while self.conn.local_flow_control_window(stream_id) < 1: 140 try: 141 await self.wait_for_flow_control(stream_id) 142 except asyncio.CancelledError: 143 return 144 145 chunk_size = min( 146 self.conn.local_flow_control_window(stream_id), 147 len(data), 148 self.conn.max_outbound_frame_size, 149 ) 150 151 try: 152 self.conn.send_data( 153 stream_id, 154 data[:chunk_size], 155 end_stream=(chunk_size == len(data)) 156 ) 157 except (StreamClosedError, ProtocolError): 158 # The stream got closed and we didn't get told. We're done 159 # here. 160 break 161 162 self.transport.write(self.conn.data_to_send()) 163 data = data[chunk_size:] 164 165 async def wait_for_flow_control(self, stream_id): 166 """ 167 Waits for a Future that fires when the flow control window is opened. 168 """ 169 f = asyncio.Future() 170 self.flow_control_futures[stream_id] = f 171 await f 172 173 def window_updated(self, stream_id, delta): 174 """ 175 A window update frame was received. Unblock some number of flow control 176 Futures. 177 """ 178 if stream_id and stream_id in self.flow_control_futures: 179 f = self.flow_control_futures.pop(stream_id) 180 f.set_result(delta) 181 elif not stream_id: 182 for f in self.flow_control_futures.values(): 183 f.set_result(delta) 184 185 self.flow_control_futures = {} 186 187 188 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 189 ssl_context.options |= ( 190 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION 191 ) 192 ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key") 193 ssl_context.set_alpn_protocols(["h2"]) 194 195 loop = asyncio.get_event_loop() 196 # Each client connection will create a new protocol instance 197 coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context) 198 server = loop.run_until_complete(coro) 199 200 # Serve requests until Ctrl+C is pressed 201 print('Serving on {}'.format(server.sockets[0].getsockname())) 202 try: 203 loop.run_forever() 204 except KeyboardInterrupt: 205 pass 206 207 # Close the server 208 server.close() 209 loop.run_until_complete(server.wait_closed()) 210 loop.close()