stash_responder_wsh.py (1621B)
1 #!/usr/bin/python 2 import json 3 import urllib 4 from pywebsocket3 import msgutil 5 from wptserve import stash 6 7 address, authkey = stash.load_env_config() 8 stash = stash.Stash("/stash_responder", address=address, authkey=authkey) 9 10 def web_socket_do_extra_handshake(request): 11 return 12 13 def web_socket_transfer_data(request): 14 while True: 15 line = request.ws_stream.receive_message() 16 if line == "echo": 17 query = request.unparsed_uri.split('?')[1] 18 GET = dict(urllib.parse.parse_qsl(query)) 19 20 # TODO(kristijanburnik): This code should be reused from 21 # /mixed-content/generic/expect.py or implemented more generally 22 # for other tests. 23 path = GET.get("path", request.unparsed_uri.split('?')[0]) 24 key = GET["key"] 25 action = GET["action"] 26 27 if action == "put": 28 value = GET["value"] 29 stash.take(key=key, path=path) 30 stash.put(key=key, value=value, path=path) 31 response_data = json.dumps({"status": "success", "result": key}) 32 elif action == "purge": 33 value = stash.take(key=key, path=path) 34 response_data = json.dumps({"status": "success", "result": value}) 35 elif action == "take": 36 value = stash.take(key=key, path=path) 37 if value is None: 38 status = "allowed" 39 else: 40 status = "blocked" 41 response_data = json.dumps({"status": status, "result": value}) 42 43 msgutil.send_message(request, response_data) 44 45 return