tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

msg_channel_wsh.py (7837B)


      1 #!/usr/bin/python
      2 import json
      3 import logging
      4 import urllib
      5 import threading
      6 import traceback
      7 from queue import Empty
      8 
      9 from pywebsocket3 import stream, msgutil
     10 from wptserve import stash as stashmod
     11 
     12 logger = logging.getLogger()
     13 
     14 address, authkey = stashmod.load_env_config()
     15 stash = stashmod.Stash("msg_channel", address=address, authkey=authkey)
     16 
     17 # Backend for websocket based channels.
     18 #
     19 # Each socket connection has a uuid identifying the channel and a
     20 # direction which is either "read" or "write".  There can be only 1
     21 # "read" channel per uuid, but multiple "write" channels
     22 # (i.e. multiple producer, single consumer).
     23 #
     24 # The websocket connection URL contains the uuid and the direction as
     25 # named query parameters.
     26 #
     27 # Channels are backed by a queue which is stored in the stash (one
     28 # queue per uuid).
     29 #
     30 # The representation of a queue in the stash is a tuple (queue,
     31 # has_reader, writer_count).  The first field is the queue itself, the
     32 # latter are effectively reference counts for reader channels (which
     33 # is zero or one, represented by a bool) and writer channels.  Once
     34 # both counts drop to zero the queue can be deleted.
     35 #
     36 # Entries on the queue itself are formed of (command, data) pairs. The
     37 # command can be either "close", signalling the socket is closing and
     38 # the reference count on the channel should be decremented, or
     39 # "message", which indicates a message.
     40 
     41 
     42 def log(uuid, msg, level="debug"):
     43    msg = f"{uuid}: {msg}"
     44    getattr(logger, level)(msg)
     45 
     46 
     47 def web_socket_do_extra_handshake(request):
     48    return
     49 
     50 
     51 def web_socket_transfer_data(request):
     52    """Handle opening a websocket connection."""
     53 
     54    uuid, direction = parse_request(request)
     55    log(uuid, f"Got web_socket_transfer_data {direction}")
     56 
     57    # Get or create the relevant queue from the stash and update the refcount
     58    with stash.lock:
     59        value = stash.take(uuid)
     60        if value is None:
     61            queue = stash.get_queue()
     62            if direction == "read":
     63                has_reader = True
     64                writer_count = 0
     65            else:
     66                has_reader = False
     67                writer_count = 1
     68        else:
     69            queue, has_reader, writer_count = value
     70            if direction == "read":
     71                if has_reader:
     72                    raise ValueError("Tried to start multiple readers for the same queue")
     73                has_reader = True
     74            else:
     75                writer_count += 1
     76 
     77        stash.put(uuid, (queue, has_reader, writer_count))
     78 
     79    if direction == "read":
     80        run_read(request, uuid, queue)
     81    elif direction == "write":
     82        run_write(request, uuid, queue)
     83 
     84    log(uuid, f"transfer_data loop exited {direction}")
     85    close_channel(uuid, direction)
     86 
     87 
     88 def web_socket_passive_closing_handshake(request):
     89    """Handle a client initiated close.
     90 
     91    When the client closes a reader, put a message in the message
     92    queue indicating the close. For a writer we don't need special
     93    handling here because receive_message in run_read will return an
     94    empty message in this case, so that loop will exit on its own.
     95    """
     96    uuid, direction = parse_request(request)
     97    log(uuid, f"Got web_socket_passive_closing_handshake {direction}")
     98 
     99    if direction == "read":
    100        with stash.lock:
    101            data = stash.take(uuid)
    102            stash.put(uuid, data)
    103        if data is not None:
    104            queue = data[0]
    105            queue.put(("close", None))
    106 
    107    return request.ws_close_code, request.ws_close_reason
    108 
    109 
    110 def parse_request(request):
    111    query = request.unparsed_uri.split('?')[1]
    112    GET = dict(urllib.parse.parse_qsl(query))
    113    uuid = GET["uuid"]
    114    direction = GET["direction"]
    115    return uuid, direction
    116 
    117 
    118 def wait_for_close(request, uuid, queue):
    119    """Listen for messages on the socket for a read connection to a channel."""
    120    closed = False
    121    while not closed:
    122        try:
    123            msg = request.ws_stream.receive_message()
    124            if msg is None:
    125                break
    126            try:
    127                cmd, data = json.loads(msg)
    128            except ValueError:
    129                cmd = None
    130            if cmd == "close":
    131                closed = True
    132                log(uuid, "Got client initiated close")
    133            else:
    134                log(uuid, f"Unexpected message on read socket {msg}", "warning")
    135        except Exception:
    136            if not (request.server_terminated or request.client_terminated):
    137                log(uuid, f"Got exception in wait_for_close\n{traceback.format_exc()}")
    138            closed = True
    139 
    140    if not request.server_terminated:
    141        queue.put(("close", None))
    142 
    143 
    144 def run_read(request, uuid, queue):
    145    """Main loop for a read-type connection.
    146 
    147    This mostly just listens on the queue for new messages of the
    148    form (message, data). Supported messages are:
    149     message - Send `data` on the WebSocket
    150     close - Close the reader queue
    151 
    152    In addition there's a thread that listens for messages on the
    153    socket itself. Typically this socket shouldn't receive any
    154    messages, but it can receive an explicit "close" message,
    155    indicating the socket should be disconnected.
    156    """
    157 
    158    close_thread = threading.Thread(target=wait_for_close, args=(request, uuid, queue), daemon=True)
    159    close_thread.start()
    160 
    161    while True:
    162        try:
    163            data = queue.get(True, 1)
    164        except Empty:
    165            if request.server_terminated or request.client_terminated:
    166                break
    167        else:
    168            cmd, body = data
    169            log(uuid, f"queue.get ({cmd}, {body})")
    170            if cmd == "close":
    171                break
    172            if cmd == "message":
    173                msgutil.send_message(request, json.dumps(body))
    174            else:
    175                log(uuid, f"Unknown queue command {cmd}", level="warning")
    176 
    177 
    178 def run_write(request, uuid, queue):
    179    """Main loop for a write-type connection.
    180 
    181    Messages coming over the socket have the format (command, data).
    182    The recognised commands are:
    183     message - Send the message `data` over the channel.
    184     disconnectReader - Close the reader connection for this channel.
    185     delete - Force-delete the entire channel and the underlying queue.
    186    """
    187    while True:
    188        msg = request.ws_stream.receive_message()
    189        if msg is None:
    190            break
    191        cmd, body = json.loads(msg)
    192        if cmd == "disconnectReader":
    193            queue.put(("close", None))
    194        elif cmd == "message":
    195            log(uuid, f"queue.put ({cmd}, {body})")
    196            queue.put((cmd, body))
    197        elif cmd == "delete":
    198            close_channel(uuid, None)
    199 
    200 
    201 def close_channel(uuid, direction):
    202    """Update the channel state in the stash when closing a connection
    203 
    204    This updates the stash entry, including refcounts, once a
    205    connection to a channel is closed.
    206 
    207    Params:
    208    uuid - the UUID of the channel being closed.
    209    direction - "read" if a read connection was closed, "write" if a
    210                write connection was closed, None to remove the
    211                underlying queue from the stash entirely.
    212 
    213    """
    214    log(uuid, f"Got close_channel {direction}")
    215    with stash.lock:
    216        data = stash.take(uuid)
    217        if data is None:
    218            log(uuid, "Message queue already deleted")
    219            return
    220        if direction is None:
    221            # Return without replacing the channel in the stash
    222            log(uuid, "Force deleting message queue")
    223            return
    224        queue, has_reader, writer_count = data
    225        if direction == "read":
    226            has_reader = False
    227        else:
    228            writer_count -= 1
    229 
    230        if has_reader or writer_count > 0 or not queue.empty():
    231            log(uuid, f"Updating refcount {has_reader}, {writer_count}")
    232            stash.put(uuid, (queue, has_reader, writer_count))
    233        else:
    234            log(uuid, "Deleting message queue")