broadcast.rst (13175B)
1 Broadcasting messages 2 ===================== 3 4 .. currentmodule:: websockets 5 6 7 .. admonition:: If you just want to send a message to all connected clients, 8 use :func:`broadcast`. 9 :class: tip 10 11 If you want to learn about its design in depth, continue reading this 12 document. 13 14 WebSocket servers often send the same message to all connected clients or to a 15 subset of clients for which the message is relevant. 16 17 Let's explore options for broadcasting a message, explain the design 18 of :func:`broadcast`, and discuss alternatives. 19 20 For each option, we'll provide a connection handler called ``handler()`` and a 21 function or coroutine called ``broadcast()`` that sends a message to all 22 connected clients. 23 24 Integrating them is left as an exercise for the reader. You could start with:: 25 26 import asyncio 27 import websockets 28 29 async def handler(websocket): 30 ... 31 32 async def broadcast(message): 33 ... 34 35 async def broadcast_messages(): 36 while True: 37 await asyncio.sleep(1) 38 message = ... # your application logic goes here 39 await broadcast(message) 40 41 async def main(): 42 async with websockets.serve(handler, "localhost", 8765): 43 await broadcast_messages() # runs forever 44 45 if __name__ == "__main__": 46 asyncio.run(main()) 47 48 ``broadcast_messages()`` must yield control to the event loop between each 49 message, or else it will never let the server run. That's why it includes 50 ``await asyncio.sleep(1)``. 51 52 A complete example is available in the `experiments/broadcast`_ directory. 53 54 .. _experiments/broadcast: https://github.com/python-websockets/websockets/tree/main/experiments/broadcast 55 56 The naive way 57 ------------- 58 59 The most obvious way to send a message to all connected clients consists in 60 keeping track of them and sending the message to each of them. 61 62 Here's a connection handler that registers clients in a global variable:: 63 64 CLIENTS = set() 65 66 async def handler(websocket): 67 CLIENTS.add(websocket) 68 try: 69 await websocket.wait_closed() 70 finally: 71 CLIENTS.remove(websocket) 72 73 This implementation assumes that the client will never send any messages. If 74 you'd rather not make this assumption, you can change:: 75 76 await websocket.wait_closed() 77 78 to:: 79 80 async for _ in websocket: 81 pass 82 83 Here's a coroutine that broadcasts a message to all clients:: 84 85 async def broadcast(message): 86 for websocket in CLIENTS.copy(): 87 try: 88 await websocket.send(message) 89 except websockets.ConnectionClosed: 90 pass 91 92 There are two tricks in this version of ``broadcast()``. 93 94 First, it makes a copy of ``CLIENTS`` before iterating it. Else, if a client 95 connects or disconnects while ``broadcast()`` is running, the loop would fail 96 with:: 97 98 RuntimeError: Set changed size during iteration 99 100 Second, it ignores :exc:`~exceptions.ConnectionClosed` exceptions because a 101 client could disconnect between the moment ``broadcast()`` makes a copy of 102 ``CLIENTS`` and the moment it sends a message to this client. This is fine: a 103 client that disconnected doesn't belongs to "all connected clients" anymore. 104 105 The naive way can be very fast. Indeed, if all connections have enough free 106 space in their write buffers, ``await websocket.send(message)`` writes the 107 message and returns immediately, as it doesn't need to wait for the buffer to 108 drain. In this case, ``broadcast()`` doesn't yield control to the event loop, 109 which minimizes overhead. 110 111 The naive way can also fail badly. If the write buffer of a connection reaches 112 ``write_limit``, ``broadcast()`` waits for the buffer to drain before sending 113 the message to other clients. This can cause a massive drop in performance. 114 115 As a consequence, this pattern works only when write buffers never fill up, 116 which is usually outside of the control of the server. 117 118 If you know for sure that you will never write more than ``write_limit`` bytes 119 within ``ping_interval + ping_timeout``, then websockets will terminate slow 120 connections before the write buffer has time to fill up. 121 122 Don't set extreme ``write_limit``, ``ping_interval``, and ``ping_timeout`` 123 values to ensure that this condition holds. Set reasonable values and use the 124 built-in :func:`broadcast` function instead. 125 126 The concurrent way 127 ------------------ 128 129 The naive way didn't work well because it serialized writes, while the whole 130 point of asynchronous I/O is to perform I/O concurrently. 131 132 Let's modify ``broadcast()`` to send messages concurrently:: 133 134 async def send(websocket, message): 135 try: 136 await websocket.send(message) 137 except websockets.ConnectionClosed: 138 pass 139 140 def broadcast(message): 141 for websocket in CLIENTS: 142 asyncio.create_task(send(websocket, message)) 143 144 We move the error handling logic in a new coroutine and we schedule 145 a :class:`~asyncio.Task` to run it instead of executing it immediately. 146 147 Since ``broadcast()`` no longer awaits coroutines, we can make it a function 148 rather than a coroutine and do away with the copy of ``CLIENTS``. 149 150 This version of ``broadcast()`` makes clients independent from one another: a 151 slow client won't block others. As a side effect, it makes messages 152 independent from one another. 153 154 If you broadcast several messages, there is no strong guarantee that they will 155 be sent in the expected order. Fortunately, the event loop runs tasks in the 156 order in which they are created, so the order is correct in practice. 157 158 Technically, this is an implementation detail of the event loop. However, it 159 seems unlikely for an event loop to run tasks in an order other than FIFO. 160 161 If you wanted to enforce the order without relying this implementation detail, 162 you could be tempted to wait until all clients have received the message:: 163 164 async def broadcast(message): 165 if CLIENTS: # asyncio.wait doesn't accept an empty list 166 await asyncio.wait([ 167 asyncio.create_task(send(websocket, message)) 168 for websocket in CLIENTS 169 ]) 170 171 However, this doesn't really work in practice. Quite often, it will block 172 until the slowest client times out. 173 174 Backpressure meets broadcast 175 ---------------------------- 176 177 At this point, it becomes apparent that backpressure, usually a good practice, 178 doesn't work well when broadcasting a message to thousands of clients. 179 180 When you're sending messages to a single client, you don't want to send them 181 faster than the network can transfer them and the client accept them. This is 182 why :meth:`~server.WebSocketServerProtocol.send` checks if the write buffer 183 is full and, if it is, waits until it drain, giving the network and the 184 client time to catch up. This provides backpressure. 185 186 Without backpressure, you could pile up data in the write buffer until the 187 server process runs out of memory and the operating system kills it. 188 189 The :meth:`~server.WebSocketServerProtocol.send` API is designed to enforce 190 backpressure by default. This helps users of websockets write robust programs 191 even if they never heard about backpressure. 192 193 For comparison, :class:`asyncio.StreamWriter` requires users to understand 194 backpressure and to await :meth:`~asyncio.StreamWriter.drain` explicitly 195 after each :meth:`~asyncio.StreamWriter.write`. 196 197 When broadcasting messages, backpressure consists in slowing down all clients 198 in an attempt to let the slowest client catch up. With thousands of clients, 199 the slowest one is probably timing out and isn't going to receive the message 200 anyway. So it doesn't make sense to synchronize with the slowest client. 201 202 How do we avoid running out of memory when slow clients can't keep up with the 203 broadcast rate, then? The most straightforward option is to disconnect them. 204 205 If a client gets too far behind, eventually it reaches the limit defined by 206 ``ping_timeout`` and websockets terminates the connection. You can read the 207 discussion of :doc:`keepalive and timeouts <./timeouts>` for details. 208 209 How :func:`broadcast` works 210 --------------------------- 211 212 The built-in :func:`broadcast` function is similar to the naive way. The main 213 difference is that it doesn't apply backpressure. 214 215 This provides the best performance by avoiding the overhead of scheduling and 216 running one task per client. 217 218 Also, when sending text messages, encoding to UTF-8 happens only once rather 219 than once per client, providing a small performance gain. 220 221 Per-client queues 222 ----------------- 223 224 At this point, we deal with slow clients rather brutally: we disconnect then. 225 226 Can we do better? For example, we could decide to skip or to batch messages, 227 depending on how far behind a client is. 228 229 To implement this logic, we can create a queue of messages for each client and 230 run a task that gets messages from the queue and sends them to the client:: 231 232 import asyncio 233 234 CLIENTS = set() 235 236 async def relay(queue, websocket): 237 while True: 238 # Implement custom logic based on queue.qsize() and 239 # websocket.transport.get_write_buffer_size() here. 240 message = await queue.get() 241 await websocket.send(message) 242 243 async def handler(websocket): 244 queue = asyncio.Queue() 245 relay_task = asyncio.create_task(relay(queue, websocket)) 246 CLIENTS.add(queue) 247 try: 248 await websocket.wait_closed() 249 finally: 250 CLIENTS.remove(queue) 251 relay_task.cancel() 252 253 Then we can broadcast a message by pushing it to all queues:: 254 255 def broadcast(message): 256 for queue in CLIENTS: 257 queue.put_nowait(message) 258 259 The queues provide an additional buffer between the ``broadcast()`` function 260 and clients. This makes it easier to support slow clients without excessive 261 memory usage because queued messages aren't duplicated to write buffers 262 until ``relay()`` processes them. 263 264 Publish–subscribe 265 ----------------- 266 267 Can we avoid centralizing the list of connected clients in a global variable? 268 269 If each client subscribes to a stream a messages, then broadcasting becomes as 270 simple as publishing a message to the stream. 271 272 Here's a message stream that supports multiple consumers:: 273 274 class PubSub: 275 def __init__(self): 276 self.waiter = asyncio.Future() 277 278 def publish(self, value): 279 waiter, self.waiter = self.waiter, asyncio.Future() 280 waiter.set_result((value, self.waiter)) 281 282 async def subscribe(self): 283 waiter = self.waiter 284 while True: 285 value, waiter = await waiter 286 yield value 287 288 __aiter__ = subscribe 289 290 PUBSUB = PubSub() 291 292 The stream is implemented as a linked list of futures. It isn't necessary to 293 synchronize consumers. They can read the stream at their own pace, 294 independently from one another. Once all consumers read a message, there are 295 no references left, therefore the garbage collector deletes it. 296 297 The connection handler subscribes to the stream and sends messages:: 298 299 async def handler(websocket): 300 async for message in PUBSUB: 301 await websocket.send(message) 302 303 The broadcast function publishes to the stream:: 304 305 def broadcast(message): 306 PUBSUB.publish(message) 307 308 Like per-client queues, this version supports slow clients with limited memory 309 usage. Unlike per-client queues, it makes it difficult to tell how far behind 310 a client is. The ``PubSub`` class could be extended or refactored to provide 311 this information. 312 313 The ``for`` loop is gone from this version of the ``broadcast()`` function. 314 However, there's still a ``for`` loop iterating on all clients hidden deep 315 inside :mod:`asyncio`. When ``publish()`` sets the result of the ``waiter`` 316 future, :mod:`asyncio` loops on callbacks registered with this future and 317 schedules them. This is how connection handlers receive the next value from 318 the asynchronous iterator returned by ``subscribe()``. 319 320 Performance considerations 321 -------------------------- 322 323 The built-in :func:`broadcast` function sends all messages without yielding 324 control to the event loop. So does the naive way when the network and clients 325 are fast and reliable. 326 327 For each client, a WebSocket frame is prepared and sent to the network. This 328 is the minimum amount of work required to broadcast a message. 329 330 It would be tempting to prepare a frame and reuse it for all connections. 331 However, this isn't possible in general for two reasons: 332 333 * Clients can negotiate different extensions. You would have to enforce the 334 same extensions with the same parameters. For example, you would have to 335 select some compression settings and reject clients that cannot support 336 these settings. 337 338 * Extensions can be stateful, producing different encodings of the same 339 message depending on previous messages. For example, you would have to 340 disable context takeover to make compression stateless, resulting in poor 341 compression rates. 342 343 All other patterns discussed above yield control to the event loop once per 344 client because messages are sent by different tasks. This makes them slower 345 than the built-in :func:`broadcast` function. 346 347 There is no major difference between the performance of per-client queues and 348 publish–subscribe.