tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

streams.py (8924B)


      1 """
      2 Benchmark two possible implementations of a stream reader.
      3 
      4 The difference lies in the data structure that buffers incoming data:
      5 
      6 * ``ByteArrayStreamReader`` uses a ``bytearray``;
      7 * ``BytesDequeStreamReader`` uses a ``deque[bytes]``.
      8 
      9 ``ByteArrayStreamReader`` is faster for streaming small frames, which is the
     10 standard use case of websockets, likely due to its simple implementation and
     11 to ``bytearray`` being fast at appending data and removing data at the front
     12 (https://hg.python.org/cpython/rev/499a96611baa).
     13 
     14 ``BytesDequeStreamReader`` is faster for large frames and for bursts, likely
     15 because it copies payloads only once, while ``ByteArrayStreamReader`` copies
     16 them twice.
     17 
     18 """
     19 
     20 
     21 import collections
     22 import os
     23 import timeit
     24 
     25 
     26 # Implementations
     27 
     28 
     29 class ByteArrayStreamReader:
     30    def __init__(self):
     31        self.buffer = bytearray()
     32        self.eof = False
     33 
     34    def readline(self):
     35        n = 0  # number of bytes to read
     36        p = 0  # number of bytes without a newline
     37        while True:
     38            n = self.buffer.find(b"\n", p) + 1
     39            if n > 0:
     40                break
     41            p = len(self.buffer)
     42            yield
     43        r = self.buffer[:n]
     44        del self.buffer[:n]
     45        return r
     46 
     47    def readexactly(self, n):
     48        assert n >= 0
     49        while len(self.buffer) < n:
     50            yield
     51        r = self.buffer[:n]
     52        del self.buffer[:n]
     53        return r
     54 
     55    def feed_data(self, data):
     56        self.buffer += data
     57 
     58    def feed_eof(self):
     59        self.eof = True
     60 
     61    def at_eof(self):
     62        return self.eof and not self.buffer
     63 
     64 
     65 class BytesDequeStreamReader:
     66    def __init__(self):
     67        self.buffer = collections.deque()
     68        self.eof = False
     69 
     70    def readline(self):
     71        b = []
     72        while True:
     73            # Read next chunk
     74            while True:
     75                try:
     76                    c = self.buffer.popleft()
     77                except IndexError:
     78                    yield
     79                else:
     80                    break
     81            # Handle chunk
     82            n = c.find(b"\n") + 1
     83            if n == len(c):
     84                # Read exactly enough data
     85                b.append(c)
     86                break
     87            elif n > 0:
     88                # Read too much data
     89                b.append(c[:n])
     90                self.buffer.appendleft(c[n:])
     91                break
     92            else:  # n == 0
     93                # Need to read more data
     94                b.append(c)
     95        return b"".join(b)
     96 
     97    def readexactly(self, n):
     98        if n == 0:
     99            return b""
    100        b = []
    101        while True:
    102            # Read next chunk
    103            while True:
    104                try:
    105                    c = self.buffer.popleft()
    106                except IndexError:
    107                    yield
    108                else:
    109                    break
    110            # Handle chunk
    111            n -= len(c)
    112            if n == 0:
    113                # Read exactly enough data
    114                b.append(c)
    115                break
    116            elif n < 0:
    117                # Read too much data
    118                b.append(c[:n])
    119                self.buffer.appendleft(c[n:])
    120                break
    121            else:  # n >= 0
    122                # Need to read more data
    123                b.append(c)
    124        return b"".join(b)
    125 
    126    def feed_data(self, data):
    127        self.buffer.append(data)
    128 
    129    def feed_eof(self):
    130        self.eof = True
    131 
    132    def at_eof(self):
    133        return self.eof and not self.buffer
    134 
    135 
    136 # Tests
    137 
    138 
    139 class Protocol:
    140    def __init__(self, StreamReader):
    141        self.reader = StreamReader()
    142        self.events = []
    143        # Start parser coroutine
    144        self.parser = self.run_parser()
    145        next(self.parser)
    146 
    147    def run_parser(self):
    148        while True:
    149            frame = yield from self.reader.readexactly(2)
    150            self.events.append(frame)
    151            frame = yield from self.reader.readline()
    152            self.events.append(frame)
    153 
    154    def data_received(self, data):
    155        self.reader.feed_data(data)
    156        next(self.parser)  # run parser until more data is needed
    157        events, self.events = self.events, []
    158        return events
    159 
    160 
    161 def run_test(StreamReader):
    162    proto = Protocol(StreamReader)
    163 
    164    actual = proto.data_received(b"a")
    165    expected = []
    166    assert actual == expected, f"{actual} != {expected}"
    167 
    168    actual = proto.data_received(b"b")
    169    expected = [b"ab"]
    170    assert actual == expected, f"{actual} != {expected}"
    171 
    172    actual = proto.data_received(b"c")
    173    expected = []
    174    assert actual == expected, f"{actual} != {expected}"
    175 
    176    actual = proto.data_received(b"\n")
    177    expected = [b"c\n"]
    178    assert actual == expected, f"{actual} != {expected}"
    179 
    180    actual = proto.data_received(b"efghi\njklmn")
    181    expected = [b"ef", b"ghi\n", b"jk"]
    182    assert actual == expected, f"{actual} != {expected}"
    183 
    184 
    185 # Benchmarks
    186 
    187 
    188 def get_frame_packets(size, packet_size=None):
    189    if size < 126:
    190        frame = bytes([138, size])
    191    elif size < 65536:
    192        frame = bytes([138, 126]) + bytes(divmod(size, 256))
    193    else:
    194        size1, size2 = divmod(size, 65536)
    195        frame = (
    196            bytes([138, 127]) + bytes(divmod(size1, 256)) + bytes(divmod(size2, 256))
    197        )
    198    frame += os.urandom(size)
    199    if packet_size is None:
    200        return [frame]
    201    else:
    202        packets = []
    203        while frame:
    204            packets.append(frame[:packet_size])
    205            frame = frame[packet_size:]
    206        return packets
    207 
    208 
    209 def benchmark_stream(StreamReader, packets, size, count):
    210    reader = StreamReader()
    211    for _ in range(count):
    212        for packet in packets:
    213            reader.feed_data(packet)
    214        yield from reader.readexactly(2)
    215        if size >= 65536:
    216            yield from reader.readexactly(4)
    217        elif size >= 126:
    218            yield from reader.readexactly(2)
    219        yield from reader.readexactly(size)
    220    reader.feed_eof()
    221    assert reader.at_eof()
    222 
    223 
    224 def benchmark_burst(StreamReader, packets, size, count):
    225    reader = StreamReader()
    226    for _ in range(count):
    227        for packet in packets:
    228            reader.feed_data(packet)
    229    reader.feed_eof()
    230    for _ in range(count):
    231        yield from reader.readexactly(2)
    232        if size >= 65536:
    233            yield from reader.readexactly(4)
    234        elif size >= 126:
    235            yield from reader.readexactly(2)
    236        yield from reader.readexactly(size)
    237    assert reader.at_eof()
    238 
    239 
    240 def run_benchmark(size, count, packet_size=None, number=1000):
    241    stmt = f"list(benchmark(StreamReader, packets, {size}, {count}))"
    242    setup = f"packets = get_frame_packets({size}, {packet_size})"
    243    context = globals()
    244 
    245    context["StreamReader"] = context["ByteArrayStreamReader"]
    246    context["benchmark"] = context["benchmark_stream"]
    247    bas = min(timeit.repeat(stmt, setup, number=number, globals=context))
    248    context["benchmark"] = context["benchmark_burst"]
    249    bab = min(timeit.repeat(stmt, setup, number=number, globals=context))
    250 
    251    context["StreamReader"] = context["BytesDequeStreamReader"]
    252    context["benchmark"] = context["benchmark_stream"]
    253    bds = min(timeit.repeat(stmt, setup, number=number, globals=context))
    254    context["benchmark"] = context["benchmark_burst"]
    255    bdb = min(timeit.repeat(stmt, setup, number=number, globals=context))
    256 
    257    print(
    258        f"Frame size = {size} bytes, "
    259        f"frame count = {count}, "
    260        f"packet size = {packet_size}"
    261    )
    262    print(f"* ByteArrayStreamReader  (stream): {bas / number * 1_000_000:.1f}µs")
    263    print(
    264        f"* BytesDequeStreamReader (stream): "
    265        f"{bds / number * 1_000_000:.1f}µs ({(bds / bas - 1) * 100:+.1f}%)"
    266    )
    267    print(f"* ByteArrayStreamReader  (burst):  {bab / number * 1_000_000:.1f}µs")
    268    print(
    269        f"* BytesDequeStreamReader (burst):  "
    270        f"{bdb / number * 1_000_000:.1f}µs ({(bdb / bab - 1) * 100:+.1f}%)"
    271    )
    272    print()
    273 
    274 
    275 if __name__ == "__main__":
    276    run_test(ByteArrayStreamReader)
    277    run_test(BytesDequeStreamReader)
    278 
    279    run_benchmark(size=8, count=1000)
    280    run_benchmark(size=60, count=1000)
    281    run_benchmark(size=500, count=500)
    282    run_benchmark(size=4_000, count=200)
    283    run_benchmark(size=30_000, count=100)
    284    run_benchmark(size=250_000, count=50)
    285    run_benchmark(size=2_000_000, count=20)
    286 
    287    run_benchmark(size=4_000, count=200, packet_size=1024)
    288    run_benchmark(size=30_000, count=100, packet_size=1024)
    289    run_benchmark(size=250_000, count=50, packet_size=1024)
    290    run_benchmark(size=2_000_000, count=20, packet_size=1024)
    291 
    292    run_benchmark(size=30_000, count=100, packet_size=4096)
    293    run_benchmark(size=250_000, count=50, packet_size=4096)
    294    run_benchmark(size=2_000_000, count=20, packet_size=4096)
    295 
    296    run_benchmark(size=30_000, count=100, packet_size=16384)
    297    run_benchmark(size=250_000, count=50, packet_size=16384)
    298    run_benchmark(size=2_000_000, count=20, packet_size=16384)
    299 
    300    run_benchmark(size=250_000, count=50, packet_size=65536)
    301    run_benchmark(size=2_000_000, count=20, packet_size=65536)