deserializer.py (3984B)
1 # mypy: allow-untyped-defs 2 3 from ..data.session import Session, UNKNOWN 4 from datetime import datetime 5 import dateutil.parser 6 from dateutil.tz import tzutc 7 8 9 def deserialize_sessions(session_dicts): 10 sessions = [] 11 for session_dict in session_dicts: 12 session = deserialize_session(session_dict) 13 sessions.append(session) 14 return sessions 15 16 17 def deserialize_session(session_dict): 18 token = "" 19 if "token" in session_dict: 20 token = session_dict["token"] 21 tests = {"include": [], "exclude": []} 22 if "tests" in session_dict: 23 tests = session_dict["tests"] 24 if "path" in session_dict: 25 test_paths = session_dict["path"].split(", ") 26 tests["include"] = tests["include"] + test_paths 27 test_types = [] 28 if "types" in session_dict: 29 test_types = session_dict["types"] 30 user_agent = "" 31 if "user_agent" in session_dict: 32 user_agent = session_dict["user_agent"] 33 labels = [] 34 if "labels" in session_dict: 35 labels = session_dict["labels"] 36 timeouts = {} 37 if "timeouts" in session_dict: 38 timeouts = session_dict["timeouts"] 39 pending_tests = None 40 if "pending_tests" in session_dict: 41 pending_tests = session_dict["pending_tests"] 42 running_tests = None 43 if "running_tests" in session_dict: 44 running_tests = session_dict["running_tests"] 45 status = UNKNOWN 46 if "status" in session_dict: 47 status = session_dict["status"] 48 test_state = None 49 if "test_state" in session_dict: 50 test_state = session_dict["test_state"] 51 last_completed_test = None 52 if "last_completed_test" in session_dict: 53 last_completed_test = session_dict["last_completed_test"] 54 date_created = None 55 if "date_created" in session_dict: 56 date_created = session_dict["date_created"] 57 date_created = iso_to_millis(date_created) 58 date_started = None 59 if "date_started" in session_dict: 60 date_started = session_dict["date_started"] 61 date_started = iso_to_millis(date_started) 62 date_finished = None 63 if "date_finished" in session_dict: 64 date_finished = session_dict["date_finished"] 65 date_finished = iso_to_millis(date_finished) 66 is_public = False 67 if "is_public" in session_dict: 68 is_public = session_dict["is_public"] 69 reference_tokens = [] 70 if "reference_tokens" in session_dict: 71 reference_tokens = session_dict["reference_tokens"] 72 browser = None 73 if "browser" in session_dict: 74 browser = session_dict["browser"] 75 expiration_date = None 76 if "expiration_date" in session_dict: 77 expiration_date = session_dict["expiration_date"] 78 expiration_date = iso_to_millis(expiration_date) 79 type = None 80 if "type" in session_dict: 81 type = session_dict["type"] 82 malfunctioning_tests = [] 83 if "malfunctioning_tests" in session_dict: 84 malfunctioning_tests = session_dict["malfunctioning_tests"] 85 86 return Session( 87 token=token, 88 tests=tests, 89 test_types=test_types, 90 user_agent=user_agent, 91 labels=labels, 92 timeouts=timeouts, 93 pending_tests=pending_tests, 94 running_tests=running_tests, 95 status=status, 96 test_state=test_state, 97 last_completed_test=last_completed_test, 98 date_created=date_created, 99 date_started=date_started, 100 date_finished=date_finished, 101 is_public=is_public, 102 reference_tokens=reference_tokens, 103 browser=browser, 104 expiration_date=expiration_date, 105 type=type, 106 malfunctioning_tests=malfunctioning_tests 107 ) 108 109 def iso_to_millis(iso_string): 110 if iso_string is None: 111 return None 112 try: 113 date = dateutil.parser.isoparse(iso_string) 114 date = date.replace(tzinfo=tzutc()) 115 epoch = datetime.utcfromtimestamp(0).replace(tzinfo=tzutc()) 116 return int((date - epoch).total_seconds() * 1000) 117 except Exception: 118 return iso_string