twisted-server.py (5794B)
1 # -*- coding: utf-8 -*- 2 """ 3 twisted-server.py 4 ~~~~~~~~~~~~~~~~~ 5 6 A fully-functional HTTP/2 server written for Twisted. 7 """ 8 import functools 9 import mimetypes 10 import os 11 import os.path 12 import sys 13 14 from OpenSSL import crypto 15 from twisted.internet.defer import Deferred, inlineCallbacks 16 from twisted.internet.protocol import Protocol, Factory 17 from twisted.internet import endpoints, reactor, ssl 18 from h2.config import H2Configuration 19 from h2.connection import H2Connection 20 from h2.events import ( 21 RequestReceived, DataReceived, WindowUpdated 22 ) 23 from h2.exceptions import ProtocolError 24 25 26 def close_file(file, d): 27 file.close() 28 29 30 READ_CHUNK_SIZE = 8192 31 32 33 class H2Protocol(Protocol): 34 def __init__(self, root): 35 config = H2Configuration(client_side=False) 36 self.conn = H2Connection(config=config) 37 self.known_proto = None 38 self.root = root 39 40 self._flow_control_deferreds = {} 41 42 def connectionMade(self): 43 self.conn.initiate_connection() 44 self.transport.write(self.conn.data_to_send()) 45 46 def dataReceived(self, data): 47 if not self.known_proto: 48 self.known_proto = True 49 50 try: 51 events = self.conn.receive_data(data) 52 except ProtocolError: 53 if self.conn.data_to_send: 54 self.transport.write(self.conn.data_to_send()) 55 self.transport.loseConnection() 56 else: 57 for event in events: 58 if isinstance(event, RequestReceived): 59 self.requestReceived(event.headers, event.stream_id) 60 elif isinstance(event, DataReceived): 61 self.dataFrameReceived(event.stream_id) 62 elif isinstance(event, WindowUpdated): 63 self.windowUpdated(event) 64 65 if self.conn.data_to_send: 66 self.transport.write(self.conn.data_to_send()) 67 68 def requestReceived(self, headers, stream_id): 69 headers = dict(headers) # Invalid conversion, fix later. 70 assert headers[b':method'] == b'GET' 71 72 path = headers[b':path'].lstrip(b'/') 73 full_path = os.path.join(self.root, path) 74 75 if not os.path.exists(full_path): 76 response_headers = ( 77 (':status', '404'), 78 ('content-length', '0'), 79 ('server', 'twisted-h2'), 80 ) 81 self.conn.send_headers( 82 stream_id, response_headers, end_stream=True 83 ) 84 self.transport.write(self.conn.data_to_send()) 85 else: 86 self.sendFile(full_path, stream_id) 87 88 return 89 90 def dataFrameReceived(self, stream_id): 91 self.conn.reset_stream(stream_id) 92 self.transport.write(self.conn.data_to_send()) 93 94 def sendFile(self, file_path, stream_id): 95 filesize = os.stat(file_path).st_size 96 content_type, content_encoding = mimetypes.guess_type( 97 file_path.decode('utf-8') 98 ) 99 response_headers = [ 100 (':status', '200'), 101 ('content-length', str(filesize)), 102 ('server', 'twisted-h2'), 103 ] 104 if content_type: 105 response_headers.append(('content-type', content_type)) 106 if content_encoding: 107 response_headers.append(('content-encoding', content_encoding)) 108 109 self.conn.send_headers(stream_id, response_headers) 110 self.transport.write(self.conn.data_to_send()) 111 112 f = open(file_path, 'rb') 113 d = self._send_file(f, stream_id) 114 d.addErrback(functools.partial(close_file, f)) 115 116 def windowUpdated(self, event): 117 """ 118 Handle a WindowUpdated event by firing any waiting data sending 119 callbacks. 120 """ 121 stream_id = event.stream_id 122 123 if stream_id and stream_id in self._flow_control_deferreds: 124 d = self._flow_control_deferreds.pop(stream_id) 125 d.callback(event.delta) 126 elif not stream_id: 127 for d in self._flow_control_deferreds.values(): 128 d.callback(event.delta) 129 130 self._flow_control_deferreds = {} 131 132 return 133 134 @inlineCallbacks 135 def _send_file(self, file, stream_id): 136 """ 137 This callback sends more data for a given file on the stream. 138 """ 139 keep_reading = True 140 while keep_reading: 141 while not self.conn.remote_flow_control_window(stream_id): 142 yield self.wait_for_flow_control(stream_id) 143 144 chunk_size = min( 145 self.conn.remote_flow_control_window(stream_id), READ_CHUNK_SIZE 146 ) 147 data = file.read(chunk_size) 148 keep_reading = len(data) == chunk_size 149 self.conn.send_data(stream_id, data, not keep_reading) 150 self.transport.write(self.conn.data_to_send()) 151 152 if not keep_reading: 153 break 154 155 file.close() 156 157 def wait_for_flow_control(self, stream_id): 158 """ 159 Returns a Deferred that fires when the flow control window is opened. 160 """ 161 d = Deferred() 162 self._flow_control_deferreds[stream_id] = d 163 return d 164 165 166 class H2Factory(Factory): 167 def __init__(self, root): 168 self.root = root 169 170 def buildProtocol(self, addr): 171 print(H2Protocol) 172 return H2Protocol(self.root) 173 174 175 root = sys.argv[1].encode('utf-8') 176 177 with open('server.crt', 'r') as f: 178 cert_data = f.read() 179 with open('server.key', 'r') as f: 180 key_data = f.read() 181 182 cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data) 183 key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_data) 184 options = ssl.CertificateOptions( 185 privateKey=key, 186 certificate=cert, 187 acceptableProtocols=[b'h2'], 188 ) 189 190 endpoint = endpoints.SSL4ServerEndpoint(reactor, 8080, options, backlog=128) 191 endpoint.listen(H2Factory(root)) 192 reactor.run()