gevent-server.py (6649B)
1 # -*- coding: utf-8 -*- 2 """ 3 gevent-server.py 4 ================ 5 6 A simple HTTP/2 server written for gevent serving static files from a directory specified as input. 7 If no directory is provided, the current directory will be used. 8 """ 9 import mimetypes 10 import sys 11 from functools import partial 12 from pathlib import Path 13 from typing import Tuple, Dict, Optional 14 15 from gevent import socket, ssl 16 from gevent.event import Event 17 from gevent.server import StreamServer 18 from h2 import events 19 from h2.config import H2Configuration 20 from h2.connection import H2Connection 21 22 23 def get_http2_tls_context() -> ssl.SSLContext: 24 ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) 25 ctx.options |= ( 26 ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 27 ) 28 29 ctx.options |= ssl.OP_NO_COMPRESSION 30 ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20') 31 ctx.load_cert_chain(certfile='localhost.crt', keyfile='localhost.key') 32 ctx.set_alpn_protocols(['h2']) 33 try: 34 ctx.set_npn_protocols(['h2']) 35 except NotImplementedError: 36 pass 37 38 return ctx 39 40 41 class H2Worker: 42 43 def __init__(self, sock: socket, address: Tuple[str, str], source_dir: str = None): 44 self._sock = sock 45 self._address = address 46 self._flow_control_events: Dict[int, Event] = {} 47 self._server_name = 'gevent-h2' 48 self._connection: Optional[H2Connection] = None 49 self._read_chunk_size = 8192 # The maximum amount of a file we'll send in a single DATA frame 50 51 self._check_sources_dir(source_dir) 52 self._sources_dir = source_dir 53 54 self._run() 55 56 def _initiate_connection(self): 57 config = H2Configuration(client_side=False, header_encoding='utf-8') 58 self._connection = H2Connection(config=config) 59 self._connection.initiate_connection() 60 self._sock.sendall(self._connection.data_to_send()) 61 62 @staticmethod 63 def _check_sources_dir(sources_dir: str) -> None: 64 p = Path(sources_dir) 65 if not p.is_dir(): 66 raise NotADirectoryError(f'{sources_dir} does not exists') 67 68 def _send_error_response(self, status_code: str, event: events.RequestReceived) -> None: 69 self._connection.send_headers( 70 stream_id=event.stream_id, 71 headers=[ 72 (':status', status_code), 73 ('content-length', '0'), 74 ('server', self._server_name), 75 ], 76 end_stream=True 77 ) 78 self._sock.sendall(self._connection.data_to_send()) 79 80 def _handle_request(self, event: events.RequestReceived) -> None: 81 headers = dict(event.headers) 82 if headers[':method'] != 'GET': 83 self._send_error_response('405', event) 84 return 85 86 file_path = Path(self._sources_dir) / headers[':path'].lstrip('/') 87 if not file_path.is_file(): 88 self._send_error_response('404', event) 89 return 90 91 self._send_file(file_path, event.stream_id) 92 93 def _send_file(self, file_path: Path, stream_id: int) -> None: 94 """ 95 Send a file, obeying the rules of HTTP/2 flow control. 96 """ 97 file_size = file_path.stat().st_size 98 content_type, content_encoding = mimetypes.guess_type(str(file_path)) 99 response_headers = [ 100 (':status', '200'), 101 ('content-length', str(file_size)), 102 ('server', self._server_name) 103 ] 104 if content_type: 105 response_headers.append(('content-type', content_type)) 106 if content_encoding: 107 response_headers.append(('content-encoding', content_encoding)) 108 109 self._connection.send_headers(stream_id, response_headers) 110 self._sock.sendall(self._connection.data_to_send()) 111 112 with file_path.open(mode='rb', buffering=0) as f: 113 self._send_file_data(f, stream_id) 114 115 def _send_file_data(self, file_obj, stream_id: int) -> None: 116 """ 117 Send the data portion of a file. Handles flow control rules. 118 """ 119 while True: 120 while self._connection.local_flow_control_window(stream_id) < 1: 121 self._wait_for_flow_control(stream_id) 122 123 chunk_size = min(self._connection.local_flow_control_window(stream_id), self._read_chunk_size) 124 data = file_obj.read(chunk_size) 125 keep_reading = (len(data) == chunk_size) 126 127 self._connection.send_data(stream_id, data, not keep_reading) 128 self._sock.sendall(self._connection.data_to_send()) 129 130 if not keep_reading: 131 break 132 133 def _wait_for_flow_control(self, stream_id: int) -> None: 134 """ 135 Blocks until the flow control window for a given stream is opened. 136 """ 137 event = Event() 138 self._flow_control_events[stream_id] = event 139 event.wait() 140 141 def _handle_window_update(self, event: events.WindowUpdated) -> None: 142 """ 143 Unblock streams waiting on flow control, if needed. 144 """ 145 stream_id = event.stream_id 146 147 if stream_id and stream_id in self._flow_control_events: 148 g_event = self._flow_control_events.pop(stream_id) 149 g_event.set() 150 elif not stream_id: 151 # Need to keep a real list here to use only the events present at this time. 152 blocked_streams = list(self._flow_control_events.keys()) 153 for stream_id in blocked_streams: 154 g_event = self._flow_control_events.pop(stream_id) 155 g_event.set() 156 157 def _run(self) -> None: 158 self._initiate_connection() 159 160 while True: 161 data = self._sock.recv(65535) 162 if not data: 163 break 164 165 h2_events = self._connection.receive_data(data) 166 for event in h2_events: 167 if isinstance(event, events.RequestReceived): 168 self._handle_request(event) 169 elif isinstance(event, events.DataReceived): 170 self._connection.reset_stream(event.stream_id) 171 elif isinstance(event, events.WindowUpdated): 172 self._handle_window_update(event) 173 174 data_to_send = self._connection.data_to_send() 175 if data_to_send: 176 self._sock.sendall(data_to_send) 177 178 179 if __name__ == '__main__': 180 files_dir = sys.argv[1] if len(sys.argv) > 1 else f'{Path().cwd()}' 181 server = StreamServer(('127.0.0.1', 8080), partial(H2Worker, source_dir=files_dir), 182 ssl_context=get_http2_tls_context()) 183 try: 184 server.serve_forever() 185 except KeyboardInterrupt: 186 server.close()