streams.py (8924B)
1 """ 2 Benchmark two possible implementations of a stream reader. 3 4 The difference lies in the data structure that buffers incoming data: 5 6 * ``ByteArrayStreamReader`` uses a ``bytearray``; 7 * ``BytesDequeStreamReader`` uses a ``deque[bytes]``. 8 9 ``ByteArrayStreamReader`` is faster for streaming small frames, which is the 10 standard use case of websockets, likely due to its simple implementation and 11 to ``bytearray`` being fast at appending data and removing data at the front 12 (https://hg.python.org/cpython/rev/499a96611baa). 13 14 ``BytesDequeStreamReader`` is faster for large frames and for bursts, likely 15 because it copies payloads only once, while ``ByteArrayStreamReader`` copies 16 them twice. 17 18 """ 19 20 21 import collections 22 import os 23 import timeit 24 25 26 # Implementations 27 28 29 class ByteArrayStreamReader: 30 def __init__(self): 31 self.buffer = bytearray() 32 self.eof = False 33 34 def readline(self): 35 n = 0 # number of bytes to read 36 p = 0 # number of bytes without a newline 37 while True: 38 n = self.buffer.find(b"\n", p) + 1 39 if n > 0: 40 break 41 p = len(self.buffer) 42 yield 43 r = self.buffer[:n] 44 del self.buffer[:n] 45 return r 46 47 def readexactly(self, n): 48 assert n >= 0 49 while len(self.buffer) < n: 50 yield 51 r = self.buffer[:n] 52 del self.buffer[:n] 53 return r 54 55 def feed_data(self, data): 56 self.buffer += data 57 58 def feed_eof(self): 59 self.eof = True 60 61 def at_eof(self): 62 return self.eof and not self.buffer 63 64 65 class BytesDequeStreamReader: 66 def __init__(self): 67 self.buffer = collections.deque() 68 self.eof = False 69 70 def readline(self): 71 b = [] 72 while True: 73 # Read next chunk 74 while True: 75 try: 76 c = self.buffer.popleft() 77 except IndexError: 78 yield 79 else: 80 break 81 # Handle chunk 82 n = c.find(b"\n") + 1 83 if n == len(c): 84 # Read exactly enough data 85 b.append(c) 86 break 87 elif n > 0: 88 # Read too much data 89 b.append(c[:n]) 90 self.buffer.appendleft(c[n:]) 91 break 92 else: # n == 0 93 # Need to read more data 94 b.append(c) 95 return b"".join(b) 96 97 def readexactly(self, n): 98 if n == 0: 99 return b"" 100 b = [] 101 while True: 102 # Read next chunk 103 while True: 104 try: 105 c = self.buffer.popleft() 106 except IndexError: 107 yield 108 else: 109 break 110 # Handle chunk 111 n -= len(c) 112 if n == 0: 113 # Read exactly enough data 114 b.append(c) 115 break 116 elif n < 0: 117 # Read too much data 118 b.append(c[:n]) 119 self.buffer.appendleft(c[n:]) 120 break 121 else: # n >= 0 122 # Need to read more data 123 b.append(c) 124 return b"".join(b) 125 126 def feed_data(self, data): 127 self.buffer.append(data) 128 129 def feed_eof(self): 130 self.eof = True 131 132 def at_eof(self): 133 return self.eof and not self.buffer 134 135 136 # Tests 137 138 139 class Protocol: 140 def __init__(self, StreamReader): 141 self.reader = StreamReader() 142 self.events = [] 143 # Start parser coroutine 144 self.parser = self.run_parser() 145 next(self.parser) 146 147 def run_parser(self): 148 while True: 149 frame = yield from self.reader.readexactly(2) 150 self.events.append(frame) 151 frame = yield from self.reader.readline() 152 self.events.append(frame) 153 154 def data_received(self, data): 155 self.reader.feed_data(data) 156 next(self.parser) # run parser until more data is needed 157 events, self.events = self.events, [] 158 return events 159 160 161 def run_test(StreamReader): 162 proto = Protocol(StreamReader) 163 164 actual = proto.data_received(b"a") 165 expected = [] 166 assert actual == expected, f"{actual} != {expected}" 167 168 actual = proto.data_received(b"b") 169 expected = [b"ab"] 170 assert actual == expected, f"{actual} != {expected}" 171 172 actual = proto.data_received(b"c") 173 expected = [] 174 assert actual == expected, f"{actual} != {expected}" 175 176 actual = proto.data_received(b"\n") 177 expected = [b"c\n"] 178 assert actual == expected, f"{actual} != {expected}" 179 180 actual = proto.data_received(b"efghi\njklmn") 181 expected = [b"ef", b"ghi\n", b"jk"] 182 assert actual == expected, f"{actual} != {expected}" 183 184 185 # Benchmarks 186 187 188 def get_frame_packets(size, packet_size=None): 189 if size < 126: 190 frame = bytes([138, size]) 191 elif size < 65536: 192 frame = bytes([138, 126]) + bytes(divmod(size, 256)) 193 else: 194 size1, size2 = divmod(size, 65536) 195 frame = ( 196 bytes([138, 127]) + bytes(divmod(size1, 256)) + bytes(divmod(size2, 256)) 197 ) 198 frame += os.urandom(size) 199 if packet_size is None: 200 return [frame] 201 else: 202 packets = [] 203 while frame: 204 packets.append(frame[:packet_size]) 205 frame = frame[packet_size:] 206 return packets 207 208 209 def benchmark_stream(StreamReader, packets, size, count): 210 reader = StreamReader() 211 for _ in range(count): 212 for packet in packets: 213 reader.feed_data(packet) 214 yield from reader.readexactly(2) 215 if size >= 65536: 216 yield from reader.readexactly(4) 217 elif size >= 126: 218 yield from reader.readexactly(2) 219 yield from reader.readexactly(size) 220 reader.feed_eof() 221 assert reader.at_eof() 222 223 224 def benchmark_burst(StreamReader, packets, size, count): 225 reader = StreamReader() 226 for _ in range(count): 227 for packet in packets: 228 reader.feed_data(packet) 229 reader.feed_eof() 230 for _ in range(count): 231 yield from reader.readexactly(2) 232 if size >= 65536: 233 yield from reader.readexactly(4) 234 elif size >= 126: 235 yield from reader.readexactly(2) 236 yield from reader.readexactly(size) 237 assert reader.at_eof() 238 239 240 def run_benchmark(size, count, packet_size=None, number=1000): 241 stmt = f"list(benchmark(StreamReader, packets, {size}, {count}))" 242 setup = f"packets = get_frame_packets({size}, {packet_size})" 243 context = globals() 244 245 context["StreamReader"] = context["ByteArrayStreamReader"] 246 context["benchmark"] = context["benchmark_stream"] 247 bas = min(timeit.repeat(stmt, setup, number=number, globals=context)) 248 context["benchmark"] = context["benchmark_burst"] 249 bab = min(timeit.repeat(stmt, setup, number=number, globals=context)) 250 251 context["StreamReader"] = context["BytesDequeStreamReader"] 252 context["benchmark"] = context["benchmark_stream"] 253 bds = min(timeit.repeat(stmt, setup, number=number, globals=context)) 254 context["benchmark"] = context["benchmark_burst"] 255 bdb = min(timeit.repeat(stmt, setup, number=number, globals=context)) 256 257 print( 258 f"Frame size = {size} bytes, " 259 f"frame count = {count}, " 260 f"packet size = {packet_size}" 261 ) 262 print(f"* ByteArrayStreamReader (stream): {bas / number * 1_000_000:.1f}µs") 263 print( 264 f"* BytesDequeStreamReader (stream): " 265 f"{bds / number * 1_000_000:.1f}µs ({(bds / bas - 1) * 100:+.1f}%)" 266 ) 267 print(f"* ByteArrayStreamReader (burst): {bab / number * 1_000_000:.1f}µs") 268 print( 269 f"* BytesDequeStreamReader (burst): " 270 f"{bdb / number * 1_000_000:.1f}µs ({(bdb / bab - 1) * 100:+.1f}%)" 271 ) 272 print() 273 274 275 if __name__ == "__main__": 276 run_test(ByteArrayStreamReader) 277 run_test(BytesDequeStreamReader) 278 279 run_benchmark(size=8, count=1000) 280 run_benchmark(size=60, count=1000) 281 run_benchmark(size=500, count=500) 282 run_benchmark(size=4_000, count=200) 283 run_benchmark(size=30_000, count=100) 284 run_benchmark(size=250_000, count=50) 285 run_benchmark(size=2_000_000, count=20) 286 287 run_benchmark(size=4_000, count=200, packet_size=1024) 288 run_benchmark(size=30_000, count=100, packet_size=1024) 289 run_benchmark(size=250_000, count=50, packet_size=1024) 290 run_benchmark(size=2_000_000, count=20, packet_size=1024) 291 292 run_benchmark(size=30_000, count=100, packet_size=4096) 293 run_benchmark(size=250_000, count=50, packet_size=4096) 294 run_benchmark(size=2_000_000, count=20, packet_size=4096) 295 296 run_benchmark(size=30_000, count=100, packet_size=16384) 297 run_benchmark(size=250_000, count=50, packet_size=16384) 298 run_benchmark(size=2_000_000, count=20, packet_size=16384) 299 300 run_benchmark(size=250_000, count=50, packet_size=65536) 301 run_benchmark(size=2_000_000, count=20, packet_size=65536)