subresource.py (7349B)
1 import os, json 2 from urllib.parse import parse_qsl, SplitResult, urlencode, urlsplit, urlunsplit 3 4 from wptserve.utils import isomorphic_decode, isomorphic_encode 5 6 def get_template(template_basename): 7 script_directory = os.path.dirname(os.path.abspath(isomorphic_decode(__file__))) 8 template_directory = os.path.abspath(os.path.join(script_directory, 9 u"template")) 10 template_filename = os.path.join(template_directory, template_basename) 11 12 with open(template_filename, "r") as f: 13 return f.read() 14 15 16 def redirect(url, response): 17 response.add_required_headers = False 18 response.writer.write_status(301) 19 response.writer.write_header(b"access-control-allow-origin", b"*") 20 response.writer.write_header(b"location", isomorphic_encode(url)) 21 response.writer.end_headers() 22 response.writer.write(u"") 23 24 25 # TODO(kristijanburnik): subdomain_prefix is a hardcoded value aligned with 26 # referrer-policy-test-case.js. The prefix should be configured in one place. 27 def __get_swapped_origin_netloc(netloc, subdomain_prefix = u"www1."): 28 if netloc.startswith(subdomain_prefix): 29 return netloc[len(subdomain_prefix):] 30 else: 31 return subdomain_prefix + netloc 32 33 34 # Creates a URL (typically a redirect target URL) that is the same as the 35 # current request URL `request.url`, except for: 36 # - When `swap_scheme` or `swap_origin` is True, its scheme/origin is changed 37 # to the other one. (http <-> https, ws <-> wss, etc.) 38 # - For `downgrade`, we redirect to a URL that would be successfully loaded 39 # if and only if upgrade-insecure-request is applied. 40 # - `query_parameter_to_remove` parameter is removed from query part. 41 # Its default is "redirection" to avoid redirect loops. 42 def create_url(request, 43 swap_scheme=False, 44 swap_origin=False, 45 downgrade=False, 46 query_parameter_to_remove=u"redirection"): 47 parsed = urlsplit(request.url) 48 destination_netloc = parsed.netloc 49 50 scheme = parsed.scheme 51 if swap_scheme: 52 scheme = u"http" if parsed.scheme == u"https" else u"https" 53 hostname = parsed.netloc.split(u':')[0] 54 port = request.server.config[u"ports"][scheme][0] 55 destination_netloc = u":".join([hostname, str(port)]) 56 57 if downgrade: 58 # These rely on some unintuitive cleverness due to WPT's test setup: 59 # 'Upgrade-Insecure-Requests' does not upgrade the port number, 60 # so we use URLs in the form `http://[domain]:[https-port]`, 61 # which will be upgraded to `https://[domain]:[https-port]`. 62 # If the upgrade fails, the load will fail, as we don't serve HTTP over 63 # the secure port. 64 if parsed.scheme == u"https": 65 scheme = u"http" 66 elif parsed.scheme == u"wss": 67 scheme = u"ws" 68 else: 69 raise ValueError(u"Downgrade redirection: Invalid scheme '%s'" % 70 parsed.scheme) 71 hostname = parsed.netloc.split(u':')[0] 72 port = request.server.config[u"ports"][parsed.scheme][0] 73 destination_netloc = u":".join([hostname, str(port)]) 74 75 if swap_origin: 76 destination_netloc = __get_swapped_origin_netloc(destination_netloc) 77 78 parsed_query = parse_qsl(parsed.query, keep_blank_values=True) 79 parsed_query = [x for x in parsed_query if x[0] != query_parameter_to_remove] 80 81 destination_url = urlunsplit(SplitResult( 82 scheme = scheme, 83 netloc = destination_netloc, 84 path = parsed.path, 85 query = urlencode(parsed_query), 86 fragment = None)) 87 88 return destination_url 89 90 91 def preprocess_redirection(request, response): 92 if b"redirection" not in request.GET: 93 return False 94 95 redirection = request.GET[b"redirection"] 96 97 if redirection == b"no-redirect": 98 return False 99 elif redirection == b"keep-scheme": 100 redirect_url = create_url(request, swap_scheme=False) 101 elif redirection == b"swap-scheme": 102 redirect_url = create_url(request, swap_scheme=True) 103 elif redirection == b"downgrade": 104 redirect_url = create_url(request, downgrade=True) 105 elif redirection == b"keep-origin": 106 redirect_url = create_url(request, swap_origin=False) 107 elif redirection == b"swap-origin": 108 redirect_url = create_url(request, swap_origin=True) 109 else: 110 raise ValueError(u"Invalid redirection type '%s'" % isomorphic_decode(redirection)) 111 112 redirect(redirect_url, response) 113 return True 114 115 116 def preprocess_stash_action(request, response): 117 if b"action" not in request.GET: 118 return False 119 120 action = request.GET[b"action"] 121 122 key = request.GET[b"key"] 123 stash = request.server.stash 124 path = request.GET[b"path"] if b"path" in request.GET \ 125 else isomorphic_encode(request.url.split(u'?')[0]) 126 127 if action == b"put": 128 value = isomorphic_decode(request.GET[b"value"]) 129 stash.take(key=key, path=path) 130 stash.put(key=key, value=value, path=path) 131 response_data = json.dumps({u"status": u"success", u"result": isomorphic_decode(key)}) 132 elif action == b"purge": 133 value = stash.take(key=key, path=path) 134 return False 135 elif action == b"take": 136 value = stash.take(key=key, path=path) 137 if value is None: 138 status = u"allowed" 139 else: 140 status = u"blocked" 141 response_data = json.dumps({u"status": status, u"result": value}) 142 else: 143 return False 144 145 response.add_required_headers = False 146 response.writer.write_status(200) 147 response.writer.write_header(b"content-type", b"text/javascript") 148 response.writer.write_header(b"cache-control", b"no-cache; must-revalidate") 149 response.writer.end_headers() 150 response.writer.write(response_data) 151 return True 152 153 154 def __noop(request, response): 155 return u"" 156 157 158 def respond(request, 159 response, 160 status_code = 200, 161 content_type = b"text/html", 162 payload_generator = __noop, 163 cache_control = b"no-cache; must-revalidate", 164 access_control_allow_origin = b"*", 165 maybe_additional_headers = None): 166 if preprocess_redirection(request, response): 167 return 168 169 if preprocess_stash_action(request, response): 170 return 171 172 response.add_required_headers = False 173 response.writer.write_status(status_code) 174 175 if access_control_allow_origin != None: 176 response.writer.write_header(b"access-control-allow-origin", 177 access_control_allow_origin) 178 response.writer.write_header(b"content-type", content_type) 179 response.writer.write_header(b"cache-control", cache_control) 180 181 additional_headers = maybe_additional_headers or {} 182 for header, value in additional_headers.items(): 183 response.writer.write_header(header, value) 184 185 response.writer.end_headers() 186 187 new_headers = {} 188 new_val = [] 189 for key, val in request.headers.items(): 190 if len(val) == 1: 191 new_val = isomorphic_decode(val[0]) 192 else: 193 new_val = [isomorphic_decode(x) for x in val] 194 new_headers[isomorphic_decode(key)] = new_val 195 196 server_data = {u"headers": json.dumps(new_headers, indent = 4)} 197 198 payload = payload_generator(server_data) 199 response.writer.write(payload)