tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

client-close.py (1820B)


      1 from typing import Optional, Tuple
      2 from urllib.parse import urlsplit, parse_qsl
      3 
      4 
      5 def session_established(session):
      6    path: Optional[bytes] = None
      7    for key, value in session.request_headers:
      8        if key == b':path':
      9            path = value
     10    assert path is not None
     11    qs = dict(parse_qsl(urlsplit(path).query))
     12    token = qs[b'token']
     13    if token is None:
     14        raise Exception('token is missing, path = {}'.format(path))
     15    session.dict_for_handlers['token'] = token
     16    session.create_bidirectional_stream()
     17 
     18 
     19 def stream_reset(session, stream_id: int, error_code: int) -> None:
     20    token = session.dict_for_handlers['token']
     21    data = session.stash.take(key=token) or {}
     22 
     23    data['stream-close-info'] = {
     24        'source': 'reset',
     25        'code': error_code
     26    }
     27    session.stash.put(key=token, value=data)
     28 
     29 
     30 def stream_data_received(session,
     31                         stream_id: int,
     32                         data: bytes,
     33                         stream_ended: bool):
     34    if stream_ended:
     35        token = session.dict_for_handlers['token']
     36        stashed_data = session.stash.take(key=token) or {}
     37        stashed_data['stream-close-info'] = {
     38            'source': 'FIN',
     39        }
     40        session.stash.put(key=token, value=stashed_data)
     41 
     42 
     43 def session_closed(
     44        session, close_info: Optional[Tuple[int, bytes]], abruptly: bool) -> None:
     45    token = session.dict_for_handlers['token']
     46    data = session.stash.take(key=token) or {}
     47 
     48    decoded_close_info: Optional[Dict[str, Any]] = None
     49    if close_info:
     50        decoded_close_info = {
     51            'code': close_info[0],
     52            'reason': close_info[1].decode()
     53        }
     54 
     55    data['session-close-info'] = {
     56        'abruptly': abruptly,
     57        'close_info': decoded_close_info
     58    }
     59    session.stash.put(key=token, value=data)