msg_channel_wsh.py (7837B)
1 #!/usr/bin/python 2 import json 3 import logging 4 import urllib 5 import threading 6 import traceback 7 from queue import Empty 8 9 from pywebsocket3 import stream, msgutil 10 from wptserve import stash as stashmod 11 12 logger = logging.getLogger() 13 14 address, authkey = stashmod.load_env_config() 15 stash = stashmod.Stash("msg_channel", address=address, authkey=authkey) 16 17 # Backend for websocket based channels. 18 # 19 # Each socket connection has a uuid identifying the channel and a 20 # direction which is either "read" or "write". There can be only 1 21 # "read" channel per uuid, but multiple "write" channels 22 # (i.e. multiple producer, single consumer). 23 # 24 # The websocket connection URL contains the uuid and the direction as 25 # named query parameters. 26 # 27 # Channels are backed by a queue which is stored in the stash (one 28 # queue per uuid). 29 # 30 # The representation of a queue in the stash is a tuple (queue, 31 # has_reader, writer_count). The first field is the queue itself, the 32 # latter are effectively reference counts for reader channels (which 33 # is zero or one, represented by a bool) and writer channels. Once 34 # both counts drop to zero the queue can be deleted. 35 # 36 # Entries on the queue itself are formed of (command, data) pairs. The 37 # command can be either "close", signalling the socket is closing and 38 # the reference count on the channel should be decremented, or 39 # "message", which indicates a message. 40 41 42 def log(uuid, msg, level="debug"): 43 msg = f"{uuid}: {msg}" 44 getattr(logger, level)(msg) 45 46 47 def web_socket_do_extra_handshake(request): 48 return 49 50 51 def web_socket_transfer_data(request): 52 """Handle opening a websocket connection.""" 53 54 uuid, direction = parse_request(request) 55 log(uuid, f"Got web_socket_transfer_data {direction}") 56 57 # Get or create the relevant queue from the stash and update the refcount 58 with stash.lock: 59 value = stash.take(uuid) 60 if value is None: 61 queue = stash.get_queue() 62 if direction == "read": 63 has_reader = True 64 writer_count = 0 65 else: 66 has_reader = False 67 writer_count = 1 68 else: 69 queue, has_reader, writer_count = value 70 if direction == "read": 71 if has_reader: 72 raise ValueError("Tried to start multiple readers for the same queue") 73 has_reader = True 74 else: 75 writer_count += 1 76 77 stash.put(uuid, (queue, has_reader, writer_count)) 78 79 if direction == "read": 80 run_read(request, uuid, queue) 81 elif direction == "write": 82 run_write(request, uuid, queue) 83 84 log(uuid, f"transfer_data loop exited {direction}") 85 close_channel(uuid, direction) 86 87 88 def web_socket_passive_closing_handshake(request): 89 """Handle a client initiated close. 90 91 When the client closes a reader, put a message in the message 92 queue indicating the close. For a writer we don't need special 93 handling here because receive_message in run_read will return an 94 empty message in this case, so that loop will exit on its own. 95 """ 96 uuid, direction = parse_request(request) 97 log(uuid, f"Got web_socket_passive_closing_handshake {direction}") 98 99 if direction == "read": 100 with stash.lock: 101 data = stash.take(uuid) 102 stash.put(uuid, data) 103 if data is not None: 104 queue = data[0] 105 queue.put(("close", None)) 106 107 return request.ws_close_code, request.ws_close_reason 108 109 110 def parse_request(request): 111 query = request.unparsed_uri.split('?')[1] 112 GET = dict(urllib.parse.parse_qsl(query)) 113 uuid = GET["uuid"] 114 direction = GET["direction"] 115 return uuid, direction 116 117 118 def wait_for_close(request, uuid, queue): 119 """Listen for messages on the socket for a read connection to a channel.""" 120 closed = False 121 while not closed: 122 try: 123 msg = request.ws_stream.receive_message() 124 if msg is None: 125 break 126 try: 127 cmd, data = json.loads(msg) 128 except ValueError: 129 cmd = None 130 if cmd == "close": 131 closed = True 132 log(uuid, "Got client initiated close") 133 else: 134 log(uuid, f"Unexpected message on read socket {msg}", "warning") 135 except Exception: 136 if not (request.server_terminated or request.client_terminated): 137 log(uuid, f"Got exception in wait_for_close\n{traceback.format_exc()}") 138 closed = True 139 140 if not request.server_terminated: 141 queue.put(("close", None)) 142 143 144 def run_read(request, uuid, queue): 145 """Main loop for a read-type connection. 146 147 This mostly just listens on the queue for new messages of the 148 form (message, data). Supported messages are: 149 message - Send `data` on the WebSocket 150 close - Close the reader queue 151 152 In addition there's a thread that listens for messages on the 153 socket itself. Typically this socket shouldn't receive any 154 messages, but it can receive an explicit "close" message, 155 indicating the socket should be disconnected. 156 """ 157 158 close_thread = threading.Thread(target=wait_for_close, args=(request, uuid, queue), daemon=True) 159 close_thread.start() 160 161 while True: 162 try: 163 data = queue.get(True, 1) 164 except Empty: 165 if request.server_terminated or request.client_terminated: 166 break 167 else: 168 cmd, body = data 169 log(uuid, f"queue.get ({cmd}, {body})") 170 if cmd == "close": 171 break 172 if cmd == "message": 173 msgutil.send_message(request, json.dumps(body)) 174 else: 175 log(uuid, f"Unknown queue command {cmd}", level="warning") 176 177 178 def run_write(request, uuid, queue): 179 """Main loop for a write-type connection. 180 181 Messages coming over the socket have the format (command, data). 182 The recognised commands are: 183 message - Send the message `data` over the channel. 184 disconnectReader - Close the reader connection for this channel. 185 delete - Force-delete the entire channel and the underlying queue. 186 """ 187 while True: 188 msg = request.ws_stream.receive_message() 189 if msg is None: 190 break 191 cmd, body = json.loads(msg) 192 if cmd == "disconnectReader": 193 queue.put(("close", None)) 194 elif cmd == "message": 195 log(uuid, f"queue.put ({cmd}, {body})") 196 queue.put((cmd, body)) 197 elif cmd == "delete": 198 close_channel(uuid, None) 199 200 201 def close_channel(uuid, direction): 202 """Update the channel state in the stash when closing a connection 203 204 This updates the stash entry, including refcounts, once a 205 connection to a channel is closed. 206 207 Params: 208 uuid - the UUID of the channel being closed. 209 direction - "read" if a read connection was closed, "write" if a 210 write connection was closed, None to remove the 211 underlying queue from the stash entirely. 212 213 """ 214 log(uuid, f"Got close_channel {direction}") 215 with stash.lock: 216 data = stash.take(uuid) 217 if data is None: 218 log(uuid, "Message queue already deleted") 219 return 220 if direction is None: 221 # Return without replacing the channel in the stash 222 log(uuid, "Force deleting message queue") 223 return 224 queue, has_reader, writer_count = data 225 if direction == "read": 226 has_reader = False 227 else: 228 writer_count -= 1 229 230 if has_reader or writer_count > 0 or not queue.empty(): 231 log(uuid, f"Updating refcount {has_reader}, {writer_count}") 232 stash.put(uuid, (queue, has_reader, writer_count)) 233 else: 234 log(uuid, "Deleting message queue")