test_server.py (537B)
1 import logging 2 3 import asyncio 4 import websockets 5 6 7 logging.basicConfig(level=logging.WARNING) 8 9 # Uncomment this line to make only websockets more verbose. 10 # logging.getLogger('websockets').setLevel(logging.DEBUG) 11 12 13 HOST, PORT = "127.0.0.1", 8642 14 15 16 async def echo(ws): 17 async for msg in ws: 18 await ws.send(msg) 19 20 21 async def main(): 22 with websockets.serve(echo, HOST, PORT, max_size=2 ** 25, max_queue=1): 23 try: 24 await asyncio.Future() 25 except KeyboardInterrupt: 26 pass 27 28 29 asyncio.run(main())