tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

server.py (4527B)


      1 #!/usr/bin/env python
      2 
      3 import asyncio
      4 import functools
      5 import os
      6 import sys
      7 import time
      8 
      9 import websockets
     10 
     11 
     12 CLIENTS = set()
     13 
     14 
     15 async def send(websocket, message):
     16    try:
     17        await websocket.send(message)
     18    except websockets.ConnectionClosed:
     19        pass
     20 
     21 
     22 async def relay(queue, websocket):
     23    while True:
     24        message = await queue.get()
     25        await websocket.send(message)
     26 
     27 
     28 class PubSub:
     29    def __init__(self):
     30        self.waiter = asyncio.Future()
     31 
     32    def publish(self, value):
     33        waiter, self.waiter = self.waiter, asyncio.Future()
     34        waiter.set_result((value, self.waiter))
     35 
     36    async def subscribe(self):
     37        waiter = self.waiter
     38        while True:
     39            value, waiter = await waiter
     40            yield value
     41 
     42    __aiter__ = subscribe
     43 
     44 
     45 PUBSUB = PubSub()
     46 
     47 
     48 async def handler(websocket, method=None):
     49    if method in ["default", "naive", "task", "wait"]:
     50        CLIENTS.add(websocket)
     51        try:
     52            await websocket.wait_closed()
     53        finally:
     54            CLIENTS.remove(websocket)
     55    elif method == "queue":
     56        queue = asyncio.Queue()
     57        relay_task = asyncio.create_task(relay(queue, websocket))
     58        CLIENTS.add(queue)
     59        try:
     60            await websocket.wait_closed()
     61        finally:
     62            CLIENTS.remove(queue)
     63            relay_task.cancel()
     64    elif method == "pubsub":
     65        async for message in PUBSUB:
     66            await websocket.send(message)
     67    else:
     68        raise NotImplementedError(f"unsupported method: {method}")
     69 
     70 
     71 async def broadcast(method, size, delay):
     72    """Broadcast messages at regular intervals."""
     73    load_average = 0
     74    time_average = 0
     75    pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
     76    await asyncio.sleep(delay)
     77    while True:
     78        print(f"clients = {len(CLIENTS)}")
     79        pc0, pt0 = time.perf_counter_ns(), time.process_time_ns()
     80        load_average = 0.9 * load_average + 0.1 * (pt0 - pt1) / (pc0 - pc1)
     81        print(
     82            f"load = {(pt0 - pt1) / (pc0 - pc1) * 100:.1f}% / "
     83            f"average = {load_average * 100:.1f}%, "
     84            f"late = {(pc0 - pc1 - delay * 1e9) / 1e6:.1f} ms"
     85        )
     86        pc1, pt1 = pc0, pt0
     87 
     88        assert size > 20
     89        message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)
     90 
     91        if method == "default":
     92            websockets.broadcast(CLIENTS, message)
     93        elif method == "naive":
     94            # Since the loop can yield control, make a copy of CLIENTS
     95            # to avoid: RuntimeError: Set changed size during iteration
     96            for websocket in CLIENTS.copy():
     97                await send(websocket, message)
     98        elif method == "task":
     99            for websocket in CLIENTS:
    100                asyncio.create_task(send(websocket, message))
    101        elif method == "wait":
    102            if CLIENTS:  # asyncio.wait doesn't accept an empty list
    103                await asyncio.wait(
    104                    [
    105                        asyncio.create_task(send(websocket, message))
    106                        for websocket in CLIENTS
    107                    ]
    108                )
    109        elif method == "queue":
    110            for queue in CLIENTS:
    111                queue.put_nowait(message)
    112        elif method == "pubsub":
    113            PUBSUB.publish(message)
    114        else:
    115            raise NotImplementedError(f"unsupported method: {method}")
    116 
    117        pc2 = time.perf_counter_ns()
    118        wait = delay + (pc1 - pc2) / 1e9
    119        time_average = 0.9 * time_average + 0.1 * (pc2 - pc1)
    120        print(
    121            f"broadcast = {(pc2 - pc1) / 1e6:.1f}ms / "
    122            f"average = {time_average / 1e6:.1f}ms, "
    123            f"wait = {wait * 1e3:.1f}ms"
    124        )
    125        await asyncio.sleep(wait)
    126        print()
    127 
    128 
    129 async def main(method, size, delay):
    130    async with websockets.serve(
    131        functools.partial(handler, method=method),
    132        "localhost",
    133        8765,
    134        compression=None,
    135        ping_timeout=None,
    136    ):
    137        await broadcast(method, size, delay)
    138 
    139 
    140 if __name__ == "__main__":
    141    try:
    142        method = sys.argv[1]
    143        assert method in ["default", "naive", "task", "wait", "queue", "pubsub"]
    144        size = int(sys.argv[2])
    145        delay = float(sys.argv[3])
    146    except Exception as exc:
    147        print(f"Usage: {sys.argv[0]} method size delay")
    148        print("    Start a server broadcasting messages with <method> e.g. naive")
    149        print("    Send a payload of <size> bytes every <delay> seconds")
    150        print()
    151        print(exc)
    152    else:
    153        asyncio.run(main(method, size, delay))