tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

broadcast.rst (13175B)


      1 Broadcasting messages
      2 =====================
      3 
      4 .. currentmodule:: websockets
      5 
      6 
      7 .. admonition:: If you just want to send a message to all connected clients,
      8    use :func:`broadcast`.
      9    :class: tip
     10 
     11    If you want to learn about its design in depth, continue reading this
     12    document.
     13 
     14 WebSocket servers often send the same message to all connected clients or to a
     15 subset of clients for which the message is relevant.
     16 
     17 Let's explore options for broadcasting a message, explain the design
     18 of :func:`broadcast`, and discuss alternatives.
     19 
     20 For each option, we'll provide a connection handler called ``handler()`` and a
     21 function or coroutine called ``broadcast()`` that sends a message to all
     22 connected clients.
     23 
     24 Integrating them is left as an exercise for the reader. You could start with::
     25 
     26    import asyncio
     27    import websockets
     28 
     29    async def handler(websocket):
     30        ...
     31 
     32    async def broadcast(message):
     33        ...
     34 
     35    async def broadcast_messages():
     36        while True:
     37            await asyncio.sleep(1)
     38            message = ...  # your application logic goes here
     39            await broadcast(message)
     40 
     41    async def main():
     42        async with websockets.serve(handler, "localhost", 8765):
     43            await broadcast_messages()  # runs forever
     44 
     45    if __name__ == "__main__":
     46        asyncio.run(main())
     47 
     48 ``broadcast_messages()`` must yield control to the event loop between each
     49 message, or else it will never let the server run. That's why it includes
     50 ``await asyncio.sleep(1)``.
     51 
     52 A complete example is available in the `experiments/broadcast`_ directory.
     53 
     54 .. _experiments/broadcast: https://github.com/python-websockets/websockets/tree/main/experiments/broadcast
     55 
     56 The naive way
     57 -------------
     58 
     59 The most obvious way to send a message to all connected clients consists in
     60 keeping track of them and sending the message to each of them.
     61 
     62 Here's a connection handler that registers clients in a global variable::
     63 
     64    CLIENTS = set()
     65 
     66    async def handler(websocket):
     67        CLIENTS.add(websocket)
     68        try:
     69            await websocket.wait_closed()
     70        finally:
     71            CLIENTS.remove(websocket)
     72 
     73 This implementation assumes that the client will never send any messages. If
     74 you'd rather not make this assumption, you can change::
     75 
     76            await websocket.wait_closed()
     77 
     78 to::
     79 
     80            async for _ in websocket:
     81                pass
     82 
     83 Here's a coroutine that broadcasts a message to all clients::
     84 
     85    async def broadcast(message):
     86        for websocket in CLIENTS.copy():
     87            try:
     88                await websocket.send(message)
     89            except websockets.ConnectionClosed:
     90                pass
     91 
     92 There are two tricks in this version of ``broadcast()``.
     93 
     94 First, it makes a copy of ``CLIENTS`` before iterating it. Else, if a client
     95 connects or disconnects while ``broadcast()`` is running, the loop would fail
     96 with::
     97 
     98    RuntimeError: Set changed size during iteration
     99 
    100 Second, it ignores :exc:`~exceptions.ConnectionClosed` exceptions because a
    101 client could disconnect between the moment ``broadcast()`` makes a copy of
    102 ``CLIENTS`` and the moment it sends a message to this client. This is fine: a
    103 client that disconnected doesn't belongs to "all connected clients" anymore.
    104 
    105 The naive way can be very fast. Indeed, if all connections have enough free
    106 space in their write buffers, ``await websocket.send(message)`` writes the
    107 message and returns immediately, as it doesn't need to wait for the buffer to
    108 drain. In this case, ``broadcast()`` doesn't yield control to the event loop,
    109 which minimizes overhead.
    110 
    111 The naive way can also fail badly. If the write buffer of a connection reaches
    112 ``write_limit``, ``broadcast()`` waits for the buffer to drain before sending
    113 the message to other clients. This can cause a massive drop in performance.
    114 
    115 As a consequence, this pattern works only when write buffers never fill up,
    116 which is usually outside of the control of the server.
    117 
    118 If you know for sure that you will never write more than ``write_limit`` bytes
    119 within ``ping_interval + ping_timeout``, then websockets will terminate slow
    120 connections before the write buffer has time to fill up.
    121 
    122 Don't set extreme ``write_limit``, ``ping_interval``, and ``ping_timeout``
    123 values to ensure that this condition holds. Set reasonable values and use the
    124 built-in :func:`broadcast` function instead.
    125 
    126 The concurrent way
    127 ------------------
    128 
    129 The naive way didn't work well because it serialized writes, while the whole
    130 point of asynchronous I/O is to perform I/O concurrently.
    131 
    132 Let's modify ``broadcast()`` to send messages concurrently::
    133 
    134    async def send(websocket, message):
    135        try:
    136            await websocket.send(message)
    137        except websockets.ConnectionClosed:
    138            pass
    139 
    140    def broadcast(message):
    141        for websocket in CLIENTS:
    142            asyncio.create_task(send(websocket, message))
    143 
    144 We move the error handling logic in a new coroutine and we schedule
    145 a :class:`~asyncio.Task` to run it instead of executing it immediately.
    146 
    147 Since ``broadcast()`` no longer awaits coroutines, we can make it a function
    148 rather than a coroutine and do away with the copy of ``CLIENTS``.
    149 
    150 This version of ``broadcast()`` makes clients independent from one another: a
    151 slow client won't block others. As a side effect, it makes messages
    152 independent from one another.
    153 
    154 If you broadcast several messages, there is no strong guarantee that they will
    155 be sent in the expected order. Fortunately, the event loop runs tasks in the
    156 order in which they are created, so the order is correct in practice.
    157 
    158 Technically, this is an implementation detail of the event loop. However, it
    159 seems unlikely for an event loop to run tasks in an order other than FIFO.
    160 
    161 If you wanted to enforce the order without relying this implementation detail,
    162 you could be tempted to wait until all clients have received the message::
    163 
    164    async def broadcast(message):
    165        if CLIENTS:  # asyncio.wait doesn't accept an empty list
    166            await asyncio.wait([
    167                asyncio.create_task(send(websocket, message))
    168                for websocket in CLIENTS
    169            ])
    170 
    171 However, this doesn't really work in practice. Quite often, it will block
    172 until the slowest client times out.
    173 
    174 Backpressure meets broadcast
    175 ----------------------------
    176 
    177 At this point, it becomes apparent that backpressure, usually a good practice,
    178 doesn't work well when broadcasting a message to thousands of clients.
    179 
    180 When you're sending messages to a single client, you don't want to send them
    181 faster than the network can transfer them and the client accept them. This is
    182 why :meth:`~server.WebSocketServerProtocol.send` checks if the write buffer
    183 is full and, if it is, waits until it drain, giving the network and the
    184 client time to catch up. This provides backpressure.
    185 
    186 Without backpressure, you could pile up data in the write buffer until the
    187 server process runs out of memory and the operating system kills it.
    188 
    189 The :meth:`~server.WebSocketServerProtocol.send` API is designed to enforce
    190 backpressure by default. This helps users of websockets write robust programs
    191 even if they never heard about backpressure.
    192 
    193 For comparison, :class:`asyncio.StreamWriter` requires users to understand
    194 backpressure and to await :meth:`~asyncio.StreamWriter.drain` explicitly
    195 after each :meth:`~asyncio.StreamWriter.write`.
    196 
    197 When broadcasting messages, backpressure consists in slowing down all clients
    198 in an attempt to let the slowest client catch up. With thousands of clients,
    199 the slowest one is probably timing out and isn't going to receive the message
    200 anyway. So it doesn't make sense to synchronize with the slowest client.
    201 
    202 How do we avoid running out of memory when slow clients can't keep up with the
    203 broadcast rate, then? The most straightforward option is to disconnect them.
    204 
    205 If a client gets too far behind, eventually it reaches the limit defined by
    206 ``ping_timeout`` and websockets terminates the connection. You can read the
    207 discussion of :doc:`keepalive and timeouts <./timeouts>` for details.
    208 
    209 How :func:`broadcast` works
    210 ---------------------------
    211 
    212 The built-in :func:`broadcast` function is similar to the naive way. The main
    213 difference is that it doesn't apply backpressure.
    214 
    215 This provides the best performance by avoiding the overhead of scheduling and
    216 running one task per client.
    217 
    218 Also, when sending text messages, encoding to UTF-8 happens only once rather
    219 than once per client, providing a small performance gain.
    220 
    221 Per-client queues
    222 -----------------
    223 
    224 At this point, we deal with slow clients rather brutally: we disconnect then.
    225 
    226 Can we do better? For example, we could decide to skip or to batch messages,
    227 depending on how far behind a client is.
    228 
    229 To implement this logic, we can create a queue of messages for each client and
    230 run a task that gets messages from the queue and sends them to the client::
    231 
    232    import asyncio
    233 
    234    CLIENTS = set()
    235 
    236    async def relay(queue, websocket):
    237        while True:
    238            # Implement custom logic based on queue.qsize() and
    239            # websocket.transport.get_write_buffer_size() here.
    240            message = await queue.get()
    241            await websocket.send(message)
    242 
    243    async def handler(websocket):
    244        queue = asyncio.Queue()
    245        relay_task = asyncio.create_task(relay(queue, websocket))
    246        CLIENTS.add(queue)
    247        try:
    248            await websocket.wait_closed()
    249        finally:
    250            CLIENTS.remove(queue)
    251            relay_task.cancel()
    252 
    253 Then we can broadcast a message by pushing it to all queues::
    254 
    255    def broadcast(message):
    256        for queue in CLIENTS:
    257            queue.put_nowait(message)
    258 
    259 The queues provide an additional buffer between the ``broadcast()`` function
    260 and clients. This makes it easier to support slow clients without excessive
    261 memory usage because queued messages aren't duplicated to  write buffers
    262 until ``relay()`` processes them.
    263 
    264 Publish–subscribe
    265 -----------------
    266 
    267 Can we avoid centralizing the list of connected clients in a global variable?
    268 
    269 If each client subscribes to a stream a messages, then broadcasting becomes as
    270 simple as publishing a message to the stream.
    271 
    272 Here's a message stream that supports multiple consumers::
    273 
    274    class PubSub:
    275        def __init__(self):
    276            self.waiter = asyncio.Future()
    277 
    278        def publish(self, value):
    279            waiter, self.waiter = self.waiter, asyncio.Future()
    280            waiter.set_result((value, self.waiter))
    281 
    282        async def subscribe(self):
    283            waiter = self.waiter
    284            while True:
    285                value, waiter = await waiter
    286                yield value
    287 
    288        __aiter__ = subscribe
    289 
    290    PUBSUB = PubSub()
    291 
    292 The stream is implemented as a linked list of futures. It isn't necessary to
    293 synchronize consumers. They can read the stream at their own pace,
    294 independently from one another. Once all consumers read a message, there are
    295 no references left, therefore the garbage collector deletes it.
    296 
    297 The connection handler subscribes to the stream and sends messages::
    298 
    299    async def handler(websocket):
    300        async for message in PUBSUB:
    301            await websocket.send(message)
    302 
    303 The broadcast function publishes to the stream::
    304 
    305    def broadcast(message):
    306        PUBSUB.publish(message)
    307 
    308 Like per-client queues, this version supports slow clients with limited memory
    309 usage. Unlike per-client queues, it makes it difficult to tell how far behind
    310 a client is. The ``PubSub`` class could be extended or refactored to provide
    311 this information.
    312 
    313 The ``for`` loop is gone from this version of the ``broadcast()`` function.
    314 However, there's still a ``for`` loop iterating on all clients hidden deep
    315 inside :mod:`asyncio`. When ``publish()`` sets the result of the ``waiter``
    316 future, :mod:`asyncio` loops on callbacks registered with this future and
    317 schedules them. This is how connection handlers receive the next value from
    318 the asynchronous iterator returned by ``subscribe()``.
    319 
    320 Performance considerations
    321 --------------------------
    322 
    323 The built-in :func:`broadcast` function sends all messages without yielding
    324 control to the event loop. So does the naive way when the network and clients
    325 are fast and reliable.
    326 
    327 For each client, a WebSocket frame is prepared and sent to the network. This
    328 is the minimum amount of work required to broadcast a message.
    329 
    330 It would be tempting to prepare a frame and reuse it for all connections.
    331 However, this isn't possible in general for two reasons:
    332 
    333 * Clients can negotiate different extensions. You would have to enforce the
    334  same extensions with the same parameters. For example, you would have to
    335  select some compression settings and reject clients that cannot support
    336  these settings.
    337 
    338 * Extensions can be stateful, producing different encodings of the same
    339  message depending on previous messages. For example, you would have to
    340  disable context takeover to make compression stateless, resulting in poor
    341  compression rates.
    342 
    343 All other patterns discussed above yield control to the event loop once per
    344 client because messages are sent by different tasks. This makes them slower
    345 than the built-in :func:`broadcast` function.
    346 
    347 There is no major difference between the performance of per-client queues and
    348 publish–subscribe.