api_handler.py (3318B)
1 # mypy: allow-untyped-defs 2 3 import json 4 import sys 5 import traceback 6 import logging 7 8 from urllib.parse import parse_qsl 9 10 global logger 11 logger = logging.getLogger("wave-api-handler") 12 13 14 class ApiHandler: 15 def __init__(self, web_root): 16 self._web_root = web_root 17 18 def set_headers(self, response, headers): 19 if not isinstance(response.headers, list): 20 response.headers = [] 21 for header in headers: 22 response.headers.append(header) 23 24 def send_json(self, data, response, status=None): 25 if status is None: 26 status = 200 27 json_string = json.dumps(data, indent=4) 28 response.content = json_string 29 self.set_headers(response, [("Content-Type", "application/json")]) 30 response.status = status 31 32 def send_file(self, blob, file_name, response): 33 self.set_headers(response, 34 [("Content-Disposition", 35 "attachment;filename=" + file_name)]) 36 response.content = blob 37 38 def send_zip(self, data, file_name, response): 39 response.headers = [("Content-Type", "application/x-compressed")] 40 self.send_file(data, file_name, response) 41 42 def parse_uri(self, request): 43 path = request.url_parts.path 44 if self._web_root is not None: 45 path = path[len(self._web_root):] 46 47 uri_parts = list(filter(None, path.split("/"))) 48 return uri_parts 49 50 def parse_query_parameters(self, request): 51 return dict(parse_qsl(request.url_parts.query)) 52 53 def handle_exception(self, message): 54 info = sys.exc_info() 55 traceback.print_tb(info[2]) 56 logger.error(f"{message}: {info[0].__name__}: {info[1].args[0]}") 57 58 def create_hal_list(self, items, uris, index, count, total): 59 hal_list = {} 60 links = {} 61 if uris is not None: 62 for relation in uris: 63 if relation == "self": 64 continue 65 uri = uris[relation] 66 templated = "{" in uri 67 links[relation] = {"href": uri, "templated": templated} 68 69 if "self" in uris: 70 self_uri = uris["self"] 71 self_uri += f"?index={index}&count={count}" 72 links["self"] = {"href": self_uri} 73 74 first_uri = uris["self"] 75 first_uri += f"?index={0}&count={count}" 76 links["first"] = {"href": first_uri} 77 78 last_uri = uris["self"] 79 last_uri += f"?index={total - (total % count)}&count={count}" 80 links["last"] = {"href": last_uri} 81 82 if index + count <= total: 83 next_index = index + count 84 next_uri = uris["self"] 85 next_uri += f"?index={next_index}&count={count}" 86 links["next"] = {"href": next_uri} 87 88 if index != 0: 89 previous_index = index - count 90 if previous_index < 0: 91 previous_index = 0 92 previous_uri = uris["self"] 93 previous_uri += f"?index={previous_index}&count={count}" 94 links["previous"] = {"href": previous_uri} 95 96 hal_list["_links"] = links 97 hal_list["items"] = items 98 99 return hal_list