app.py (5644B)
1 #!/usr/bin/env python 2 3 import asyncio 4 import http 5 import http.cookies 6 import pathlib 7 import signal 8 import urllib.parse 9 import uuid 10 11 import websockets 12 from websockets.frames import CloseCode 13 14 15 # User accounts database 16 17 USERS = {} 18 19 20 def create_token(user, lifetime=1): 21 """Create token for user and delete it once its lifetime is over.""" 22 token = uuid.uuid4().hex 23 USERS[token] = user 24 asyncio.get_running_loop().call_later(lifetime, USERS.pop, token) 25 return token 26 27 28 def get_user(token): 29 """Find user authenticated by token or return None.""" 30 return USERS.get(token) 31 32 33 # Utilities 34 35 36 def get_cookie(raw, key): 37 cookie = http.cookies.SimpleCookie(raw) 38 morsel = cookie.get(key) 39 if morsel is not None: 40 return morsel.value 41 42 43 def get_query_param(path, key): 44 query = urllib.parse.urlparse(path).query 45 params = urllib.parse.parse_qs(query) 46 values = params.get(key, []) 47 if len(values) == 1: 48 return values[0] 49 50 51 # Main HTTP server 52 53 CONTENT_TYPES = { 54 ".css": "text/css", 55 ".html": "text/html; charset=utf-8", 56 ".ico": "image/x-icon", 57 ".js": "text/javascript", 58 } 59 60 61 async def serve_html(path, request_headers): 62 user = get_query_param(path, "user") 63 path = urllib.parse.urlparse(path).path 64 if path == "/": 65 if user is None: 66 page = "index.html" 67 else: 68 page = "test.html" 69 else: 70 page = path[1:] 71 72 try: 73 template = pathlib.Path(__file__).with_name(page) 74 except ValueError: 75 pass 76 else: 77 if template.is_file(): 78 headers = {"Content-Type": CONTENT_TYPES[template.suffix]} 79 body = template.read_bytes() 80 if user is not None: 81 token = create_token(user) 82 body = body.replace(b"TOKEN", token.encode()) 83 return http.HTTPStatus.OK, headers, body 84 85 return http.HTTPStatus.NOT_FOUND, {}, b"Not found\n" 86 87 88 async def noop_handler(websocket): 89 pass 90 91 92 # Send credentials as the first message in the WebSocket connection 93 94 95 async def first_message_handler(websocket): 96 token = await websocket.recv() 97 user = get_user(token) 98 if user is None: 99 await websocket.close(CloseCode.INTERNAL_ERROR, "authentication failed") 100 return 101 102 await websocket.send(f"Hello {user}!") 103 message = await websocket.recv() 104 assert message == f"Goodbye {user}." 105 106 107 # Add credentials to the WebSocket URI in a query parameter 108 109 110 class QueryParamProtocol(websockets.WebSocketServerProtocol): 111 async def process_request(self, path, headers): 112 token = get_query_param(path, "token") 113 if token is None: 114 return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n" 115 116 user = get_user(token) 117 if user is None: 118 return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" 119 120 self.user = user 121 122 123 async def query_param_handler(websocket): 124 user = websocket.user 125 126 await websocket.send(f"Hello {user}!") 127 message = await websocket.recv() 128 assert message == f"Goodbye {user}." 129 130 131 # Set a cookie on the domain of the WebSocket URI 132 133 134 class CookieProtocol(websockets.WebSocketServerProtocol): 135 async def process_request(self, path, headers): 136 if "Upgrade" not in headers: 137 template = pathlib.Path(__file__).with_name(path[1:]) 138 headers = {"Content-Type": CONTENT_TYPES[template.suffix]} 139 body = template.read_bytes() 140 return http.HTTPStatus.OK, headers, body 141 142 token = get_cookie(headers.get("Cookie", ""), "token") 143 if token is None: 144 return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n" 145 146 user = get_user(token) 147 if user is None: 148 return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" 149 150 self.user = user 151 152 153 async def cookie_handler(websocket): 154 user = websocket.user 155 156 await websocket.send(f"Hello {user}!") 157 message = await websocket.recv() 158 assert message == f"Goodbye {user}." 159 160 161 # Adding credentials to the WebSocket URI in user information 162 163 164 class UserInfoProtocol(websockets.BasicAuthWebSocketServerProtocol): 165 async def check_credentials(self, username, password): 166 if username != "token": 167 return False 168 169 user = get_user(password) 170 if user is None: 171 return False 172 173 self.user = user 174 return True 175 176 177 async def user_info_handler(websocket): 178 user = websocket.user 179 180 await websocket.send(f"Hello {user}!") 181 message = await websocket.recv() 182 assert message == f"Goodbye {user}." 183 184 185 # Start all five servers 186 187 188 async def main(): 189 # Set the stop condition when receiving SIGINT or SIGTERM. 190 loop = asyncio.get_running_loop() 191 stop = loop.create_future() 192 loop.add_signal_handler(signal.SIGINT, stop.set_result, None) 193 loop.add_signal_handler(signal.SIGTERM, stop.set_result, None) 194 195 async with websockets.serve( 196 noop_handler, 197 host="", 198 port=8000, 199 process_request=serve_html, 200 ), websockets.serve( 201 first_message_handler, 202 host="", 203 port=8001, 204 ), websockets.serve( 205 query_param_handler, 206 host="", 207 port=8002, 208 create_protocol=QueryParamProtocol, 209 ), websockets.serve( 210 cookie_handler, 211 host="", 212 port=8003, 213 create_protocol=CookieProtocol, 214 ), websockets.serve( 215 user_info_handler, 216 host="", 217 port=8004, 218 create_protocol=UserInfoProtocol, 219 ): 220 print("Running on http://localhost:8000/") 221 await stop 222 print("\rExiting") 223 224 225 if __name__ == "__main__": 226 asyncio.run(main())