server.py (4527B)
1 #!/usr/bin/env python 2 3 import asyncio 4 import functools 5 import os 6 import sys 7 import time 8 9 import websockets 10 11 12 CLIENTS = set() 13 14 15 async def send(websocket, message): 16 try: 17 await websocket.send(message) 18 except websockets.ConnectionClosed: 19 pass 20 21 22 async def relay(queue, websocket): 23 while True: 24 message = await queue.get() 25 await websocket.send(message) 26 27 28 class PubSub: 29 def __init__(self): 30 self.waiter = asyncio.Future() 31 32 def publish(self, value): 33 waiter, self.waiter = self.waiter, asyncio.Future() 34 waiter.set_result((value, self.waiter)) 35 36 async def subscribe(self): 37 waiter = self.waiter 38 while True: 39 value, waiter = await waiter 40 yield value 41 42 __aiter__ = subscribe 43 44 45 PUBSUB = PubSub() 46 47 48 async def handler(websocket, method=None): 49 if method in ["default", "naive", "task", "wait"]: 50 CLIENTS.add(websocket) 51 try: 52 await websocket.wait_closed() 53 finally: 54 CLIENTS.remove(websocket) 55 elif method == "queue": 56 queue = asyncio.Queue() 57 relay_task = asyncio.create_task(relay(queue, websocket)) 58 CLIENTS.add(queue) 59 try: 60 await websocket.wait_closed() 61 finally: 62 CLIENTS.remove(queue) 63 relay_task.cancel() 64 elif method == "pubsub": 65 async for message in PUBSUB: 66 await websocket.send(message) 67 else: 68 raise NotImplementedError(f"unsupported method: {method}") 69 70 71 async def broadcast(method, size, delay): 72 """Broadcast messages at regular intervals.""" 73 load_average = 0 74 time_average = 0 75 pc1, pt1 = time.perf_counter_ns(), time.process_time_ns() 76 await asyncio.sleep(delay) 77 while True: 78 print(f"clients = {len(CLIENTS)}") 79 pc0, pt0 = time.perf_counter_ns(), time.process_time_ns() 80 load_average = 0.9 * load_average + 0.1 * (pt0 - pt1) / (pc0 - pc1) 81 print( 82 f"load = {(pt0 - pt1) / (pc0 - pc1) * 100:.1f}% / " 83 f"average = {load_average * 100:.1f}%, " 84 f"late = {(pc0 - pc1 - delay * 1e9) / 1e6:.1f} ms" 85 ) 86 pc1, pt1 = pc0, pt0 87 88 assert size > 20 89 message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20) 90 91 if method == "default": 92 websockets.broadcast(CLIENTS, message) 93 elif method == "naive": 94 # Since the loop can yield control, make a copy of CLIENTS 95 # to avoid: RuntimeError: Set changed size during iteration 96 for websocket in CLIENTS.copy(): 97 await send(websocket, message) 98 elif method == "task": 99 for websocket in CLIENTS: 100 asyncio.create_task(send(websocket, message)) 101 elif method == "wait": 102 if CLIENTS: # asyncio.wait doesn't accept an empty list 103 await asyncio.wait( 104 [ 105 asyncio.create_task(send(websocket, message)) 106 for websocket in CLIENTS 107 ] 108 ) 109 elif method == "queue": 110 for queue in CLIENTS: 111 queue.put_nowait(message) 112 elif method == "pubsub": 113 PUBSUB.publish(message) 114 else: 115 raise NotImplementedError(f"unsupported method: {method}") 116 117 pc2 = time.perf_counter_ns() 118 wait = delay + (pc1 - pc2) / 1e9 119 time_average = 0.9 * time_average + 0.1 * (pc2 - pc1) 120 print( 121 f"broadcast = {(pc2 - pc1) / 1e6:.1f}ms / " 122 f"average = {time_average / 1e6:.1f}ms, " 123 f"wait = {wait * 1e3:.1f}ms" 124 ) 125 await asyncio.sleep(wait) 126 print() 127 128 129 async def main(method, size, delay): 130 async with websockets.serve( 131 functools.partial(handler, method=method), 132 "localhost", 133 8765, 134 compression=None, 135 ping_timeout=None, 136 ): 137 await broadcast(method, size, delay) 138 139 140 if __name__ == "__main__": 141 try: 142 method = sys.argv[1] 143 assert method in ["default", "naive", "task", "wait", "queue", "pubsub"] 144 size = int(sys.argv[2]) 145 delay = float(sys.argv[3]) 146 except Exception as exc: 147 print(f"Usage: {sys.argv[0]} method size delay") 148 print(" Start a server broadcasting messages with <method> e.g. naive") 149 print(" Send a payload of <size> bytes every <delay> seconds") 150 print() 151 print(exc) 152 else: 153 asyncio.run(main(method, size, delay))