session_manager.py (12420B)
1 import json 2 3 test_to_session_manager_mapping = {} 4 5 def initialize_test(): 6 test_id = str(len(test_to_session_manager_mapping)) 7 test_to_session_manager_mapping[test_id] = SessionManager() 8 return test_id 9 10 def find_for_request(request): 11 test_id = request.cookies.get(b'test_id').value.decode('utf-8') 12 manager = test_to_session_manager_mapping.get(test_id) 13 if manager == None: 14 raise Exception(f"Could not find manager for test_id: {test_id}") 15 return manager 16 17 class CookieDetail: 18 def __init__(self, name_and_value = None, attributes = None): 19 self.name_and_value = name_and_value 20 self.attributes = attributes 21 22 def get_name_and_value(self): 23 if self.name_and_value is None: 24 return "auth_cookie=abcdef0123" 25 return self.name_and_value 26 27 def get_attributes(self, request): 28 if self.attributes is None: 29 return f"Domain={request.url_parts.hostname}; Path=/device-bound-session-credentials" 30 return self.attributes 31 32 class SessionManager: 33 def __init__(self): 34 self.session_to_key_map = {} 35 self.should_refresh_end_session = False 36 self.authorization_value = None 37 self.scope_origin = None 38 self.registration_sends_challenge_before_instructions = False 39 self.registration_sends_challenge_with_instructions = False 40 self.cookie_details = None 41 self.session_to_cookie_details_map = {} 42 self.session_to_early_challenge_map = {} 43 self.has_called_refresh = False 44 self.scope_specification_items = [] 45 self.refresh_sends_challenge = True 46 self.refresh_url = "/device-bound-session-credentials/refresh_session.py" 47 self.include_site = True 48 self.refresh_endpoint_unavailable = False 49 self.response_session_id_override = None 50 self.allowed_refresh_initiators = ["*"] 51 self.provider_session_id = None 52 self.provider_url = None 53 self.provider_key = None 54 self.use_empty_response = False 55 self.registration_extra_cookies = [] 56 self.has_custom_query_param = False 57 self.allows_challenges = True 58 59 def next_session_id(self): 60 return len(self.session_to_key_map) 61 62 def create_new_session(self): 63 session_id = self.next_session_id() 64 self.session_to_key_map[session_id] = None 65 return session_id 66 67 def set_session_key(self, session_id, key): 68 if session_id not in self.session_to_key_map: 69 return False 70 self.session_to_key_map[session_id] = key 71 return True 72 73 def get_session_key(self, session_id): 74 return self.session_to_key_map.get(session_id) 75 76 def get_session_ids(self): 77 return list(self.session_to_key_map.keys()) 78 79 def configure_state_for_test(self, configuration): 80 should_refresh_end_session = configuration.get("shouldRefreshEndSession") 81 if should_refresh_end_session is not None: 82 self.should_refresh_end_session = should_refresh_end_session 83 84 authorization_value = configuration.get("authorizationValue") 85 if authorization_value is not None: 86 self.authorization_value = authorization_value 87 88 scope_origin = configuration.get("scopeOrigin") 89 if scope_origin is not None: 90 self.scope_origin = scope_origin 91 92 registration_sends_challenge_before_instructions = configuration.get("registrationSendsChallengeBeforeInstructions") 93 if registration_sends_challenge_before_instructions is not None: 94 self.registration_sends_challenge_before_instructions = registration_sends_challenge_before_instructions 95 96 registration_sends_challenge_with_instructions = configuration.get("registrationSendsChallengeWithInstructions") 97 if registration_sends_challenge_with_instructions is not None: 98 self.registration_sends_challenge_with_instructions = registration_sends_challenge_with_instructions 99 100 cookie_details = configuration.get("cookieDetails") 101 if cookie_details is not None: 102 self.cookie_details = [] 103 for detail in cookie_details: 104 self.cookie_details.append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes"))) 105 106 next_sessions_cookie_details = configuration.get("cookieDetailsForNextRegisteredSessions") 107 if next_sessions_cookie_details is not None: 108 next_session_id = self.next_session_id() 109 for session in next_sessions_cookie_details: 110 self.session_to_cookie_details_map[next_session_id] = [] 111 for detail in session: 112 self.session_to_cookie_details_map[next_session_id].append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes"))) 113 next_session_id += 1 114 115 next_session_early_challenge = configuration.get("earlyChallengeForNextRegisteredSession") 116 if next_session_early_challenge is not None: 117 self.session_to_early_challenge_map[self.next_session_id()] = next_session_early_challenge 118 119 scope_specification_items = configuration.get("scopeSpecificationItems") 120 if scope_specification_items is not None: 121 self.scope_specification_items = scope_specification_items 122 123 refresh_sends_challenge = configuration.get("refreshSendsChallenge") 124 if refresh_sends_challenge is not None: 125 self.refresh_sends_challenge = refresh_sends_challenge 126 127 refresh_url = configuration.get("refreshUrl") 128 if refresh_url is not None: 129 self.refresh_url = refresh_url 130 131 include_site = configuration.get("includeSite") 132 if include_site is not None: 133 self.include_site = include_site 134 135 refresh_endpoint_unavailable = configuration.get("refreshEndpointUnavailable") 136 if refresh_endpoint_unavailable is not None: 137 self.refresh_endpoint_unavailable = refresh_endpoint_unavailable 138 139 response_session_id_override = configuration.get("responseSessionIdOverride") 140 if response_session_id_override is not None: 141 self.response_session_id_override = response_session_id_override 142 143 allowed_refresh_initiators = configuration.get("allowedRefreshInitiators") 144 if allowed_refresh_initiators is not None: 145 self.allowed_refresh_initiators = allowed_refresh_initiators 146 147 provider_session_id = configuration.get("providerSessionId") 148 if provider_session_id is not None: 149 self.provider_session_id = provider_session_id 150 151 provider_url = configuration.get("providerUrl") 152 if provider_url is not None: 153 self.provider_url = provider_url 154 155 provider_key = configuration.get("providerKey") 156 if provider_key is not None: 157 self.provider_key = provider_key 158 159 use_empty_response = configuration.get("useEmptyResponse") 160 if use_empty_response is not None: 161 self.use_empty_response = use_empty_response 162 163 registration_extra_cookies = configuration.get("registrationExtraCookies") 164 if registration_extra_cookies is not None: 165 self.registration_extra_cookies = [] 166 for detail in registration_extra_cookies: 167 self.registration_extra_cookies.append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes"))) 168 169 has_custom_query_param = configuration.get("hasCustomQueryParam") 170 if has_custom_query_param is not None: 171 self.has_custom_query_param = has_custom_query_param 172 173 allows_challenges = configuration.get("allowsChallenges") 174 if allows_challenges is not None: 175 self.allows_challenges = allows_challenges 176 177 def get_should_refresh_end_session(self): 178 return self.should_refresh_end_session 179 180 def get_authorization_value(self): 181 return self.authorization_value 182 183 def get_registration_sends_challenge_before_instructions(self): 184 return self.registration_sends_challenge_before_instructions 185 186 def reset_registration_sends_challenge_before_instructions(self): 187 self.registration_sends_challenge_before_instructions = False 188 189 def get_registration_sends_challenge_with_instructions(self): 190 return self.registration_sends_challenge_with_instructions 191 192 def reset_registration_sends_challenge_with_instructions(self): 193 self.registration_sends_challenge_with_instructions = False 194 195 def get_refresh_sends_challenge(self): 196 return self.refresh_sends_challenge 197 198 def set_has_called_refresh(self, has_called_refresh): 199 self.has_called_refresh = has_called_refresh 200 201 def get_has_custom_query_param(self): 202 return self.has_custom_query_param 203 204 def pull_server_state(self): 205 return { 206 "hasCalledRefresh": self.has_called_refresh 207 } 208 209 def get_cookie_details(self, session_id): 210 # Try to use the session-specific override first. 211 if self.session_to_cookie_details_map.get(session_id) is not None: 212 return self.session_to_cookie_details_map[session_id] 213 # If there isn't any, use the general override. 214 if self.cookie_details is not None: 215 return self.cookie_details 216 return [CookieDetail()] 217 218 def get_early_challenge(self, session_id): 219 return self.session_to_early_challenge_map.get(session_id) 220 221 def get_refresh_url(self): 222 if not self.has_custom_query_param: 223 return self.refresh_url 224 return self.refresh_url + "?refreshQueryParam=456" 225 226 def get_sessions_instructions_response_credentials(self, session_id, request): 227 return list(map(lambda cookie_detail: { 228 "type": "cookie", 229 "name": cookie_detail.get_name_and_value().split("=")[0], 230 "attributes": cookie_detail.get_attributes(request) 231 }, self.get_cookie_details(session_id))) 232 233 def get_set_cookie_headers(self, cookies, request): 234 header_values = list(map( 235 lambda cookie_detail: f"{cookie_detail.get_name_and_value()}; {cookie_detail.get_attributes(request)}", 236 cookies 237 )) 238 return [("Set-Cookie", header_value) for header_value in header_values] 239 240 def get_session_instructions_response(self, session_id, request): 241 response_session_id = session_id 242 if self.response_session_id_override is not None: 243 response_session_id = self.response_session_id_override 244 245 scope_origin = "" 246 if self.scope_origin is not None: 247 scope_origin = self.scope_origin 248 249 response_body = { 250 "session_identifier": str(response_session_id), 251 "refresh_url": self.get_refresh_url(), 252 "scope": { 253 "origin": scope_origin, 254 "include_site": self.include_site, 255 "scope_specification" : self.scope_specification_items + [ 256 { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/request_early_challenge.py" }, 257 { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/end_session_via_clear_site_data.py" }, 258 { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/pull_server_state.py" }, 259 { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/set_cookie.py" }, 260 ] 261 }, 262 "credentials": self.get_sessions_instructions_response_credentials(session_id, request), 263 "allowed_refresh_initiators": self.allowed_refresh_initiators, 264 } 265 headers = self.get_set_cookie_headers(self.get_cookie_details(session_id), request) + [ 266 ("Content-Type", "application/json"), 267 ("Cache-Control", "no-store") 268 ] 269 270 response_body = "" if self.use_empty_response else json.dumps(response_body) 271 return (200, headers, response_body) 272 273 def get_refresh_endpoint_unavailable(self): 274 return self.refresh_endpoint_unavailable 275 276 def get_provider_session_id(self): 277 return self.provider_session_id 278 279 def get_provider_url(self): 280 return self.provider_url 281 282 def get_provider_key(self): 283 return self.provider_key 284 285 def get_allows_challenges(self): 286 return self.allows_challenges