client-close.py (1820B)
1 from typing import Optional, Tuple 2 from urllib.parse import urlsplit, parse_qsl 3 4 5 def session_established(session): 6 path: Optional[bytes] = None 7 for key, value in session.request_headers: 8 if key == b':path': 9 path = value 10 assert path is not None 11 qs = dict(parse_qsl(urlsplit(path).query)) 12 token = qs[b'token'] 13 if token is None: 14 raise Exception('token is missing, path = {}'.format(path)) 15 session.dict_for_handlers['token'] = token 16 session.create_bidirectional_stream() 17 18 19 def stream_reset(session, stream_id: int, error_code: int) -> None: 20 token = session.dict_for_handlers['token'] 21 data = session.stash.take(key=token) or {} 22 23 data['stream-close-info'] = { 24 'source': 'reset', 25 'code': error_code 26 } 27 session.stash.put(key=token, value=data) 28 29 30 def stream_data_received(session, 31 stream_id: int, 32 data: bytes, 33 stream_ended: bool): 34 if stream_ended: 35 token = session.dict_for_handlers['token'] 36 stashed_data = session.stash.take(key=token) or {} 37 stashed_data['stream-close-info'] = { 38 'source': 'FIN', 39 } 40 session.stash.put(key=token, value=stashed_data) 41 42 43 def session_closed( 44 session, close_info: Optional[Tuple[int, bytes]], abruptly: bool) -> None: 45 token = session.dict_for_handlers['token'] 46 data = session.stash.take(key=token) or {} 47 48 decoded_close_info: Optional[Dict[str, Any]] = None 49 if close_info: 50 decoded_close_info = { 51 'code': close_info[0], 52 'reason': close_info[1].decode() 53 } 54 55 data['session-close-info'] = { 56 'abruptly': abruptly, 57 'close_info': decoded_close_info 58 } 59 session.stash.put(key=token, value=data)