real-time-report.py (2713B)
1 from typing import List, Tuple, Union 2 3 from fledge.tentative.resources import fledge_http_server_util 4 from wptserve.request import Request 5 from wptserve.stash import Stash 6 from wptserve.utils import isomorphic_decode, isomorphic_encode 7 8 # Arbitrary key used to access the reports in the stash. 9 REPORTS_KEY = "9d285691-4386-45ad-9a79-d2ec29557cde" 10 11 CLEAR_STASH_AS_BYTES = isomorphic_encode("clear_stash") 12 13 Header = Tuple[str, str] 14 Status = Union[int, Tuple[int, str]] 15 Response = Tuple[Status, List[Header], str] 16 17 def get_request_origin(request: Request) -> str: 18 return "%s://%s" % (request.url_parts.scheme, 19 request.url_parts.netloc) 20 21 def handle_post_request(request: Request) -> Response: 22 """Handles POST request for reports. 23 24 Retrieves the report from the request body and stores the report in the 25 stash. If clear_stash is specified in the query params, clears the stash. 26 """ 27 28 if request.GET.get(CLEAR_STASH_AS_BYTES): 29 clear_stash(request.server.stash) 30 return 200, [], "Stash successfully cleared." 31 32 store_report(request.server.stash, get_request_origin(request), 33 request.body) 34 return 200, [], "" 35 36 def handle_get_request(request: Request) -> Response: 37 """Handles GET request for reports. 38 39 Retrieves and returns all reports from the stash. 40 """ 41 headers = [("Content-Type", "text/plain")] 42 reports = take_reports(request.server.stash, get_request_origin(request)) 43 headers.append(("Access-Control-Allow-Origin", "*")) 44 return 200, headers, reports 45 46 def store_report(stash: Stash, origin: str, report: str) -> None: 47 """Stores the report in the stash.""" 48 with stash.lock: 49 reports_dict = stash.take(REPORTS_KEY) 50 if not reports_dict: 51 reports_dict = {} 52 reports = reports_dict.get(origin, []) 53 reports.append(report) 54 reports_dict[origin] = reports 55 stash.put(REPORTS_KEY, reports_dict) 56 return None 57 58 def clear_stash(stash: Stash) -> None: 59 "Clears the stash." 60 stash.take(REPORTS_KEY) 61 return None 62 63 def take_reports(stash: Stash, origin: str) -> List[str]: 64 """Takes all the reports from the stash and returns them.""" 65 with stash.lock: 66 reports_dict = stash.take(REPORTS_KEY) 67 if not reports_dict: 68 reports_dict = {} 69 70 reports = reports_dict.pop(origin, []) 71 stash.put(REPORTS_KEY, reports_dict) 72 return reports 73 74 def handle_request(request: Request) -> Response: 75 """Handles request to get or store reports.""" 76 if request.method == "POST": 77 return handle_post_request(request) 78 if request.method == "GET": 79 return handle_get_request(request) 80 81 return ( 82 (405, "Method Not Allowed"), 83 [("Content-Type", "text/plain")], 84 "Only GET or POST methods are supported.", 85 )