fakegeckodriver.py (4431B)
1 #!/usr/bin/env python3 2 import argparse 3 import json 4 from http.server import BaseHTTPRequestHandler, HTTPServer 5 from uuid import uuid4 6 7 _SESSIONS = {} 8 9 10 class Window: 11 def __init__(self, handle, title="about:blank"): 12 self.handle = handle 13 self.title = title 14 15 def visit_url(self, url): 16 print("Visiting %s" % url) 17 # XXX todo, load the URL for real 18 self.url = url 19 20 21 class Session: 22 def __init__(self, uuid): 23 self.session_id = uuid 24 self.autoinc = 0 25 self.windows = {} 26 self.active_handle = self.new_window() 27 28 def visit(self, url): 29 self.windows[self.active_handle].visit_url(url) 30 31 def new_window(self): 32 w = Window(self.autoinc) 33 self.windows[w.handle] = w 34 self.autoinc += 1 35 return w.handle 36 37 38 class RequestHandler(BaseHTTPRequestHandler): 39 def _set_headers(self, status=200): 40 self.send_response(status) 41 self.send_header("Content-type", "application/json") 42 self.end_headers() 43 44 def _send_response(self, status=200, data=None): 45 if data is None: 46 data = {} 47 data = json.dumps(data).encode("utf8") 48 self._set_headers(status) 49 self.wfile.write(data) 50 51 def _parse_path(self): 52 path = self.path.lstrip("/") 53 sections = path.split("/") 54 session = None 55 action = [] 56 if len(sections) > 1: 57 session_id = sections[1] 58 if session_id in _SESSIONS: 59 session = _SESSIONS[session_id] 60 action = sections[2:] 61 return session, action 62 63 def do_GET(self): 64 print("GET " + self.path) 65 if self.path == "/status": 66 return self._send_response(data={"ready": "OK"}) 67 68 session, action = self._parse_path() 69 if action == ["window", "handles"]: 70 data = {"value": list(session.windows.keys())} 71 return self._send_response(data=data) 72 73 if action == ["moz", "context"]: 74 data = {"value": "chrome"} 75 return self._send_response(data=data) 76 77 return self._send_response(status=404) 78 79 def do_POST(self): 80 print("POST " + self.path) 81 content_length = int(self.headers["Content-Length"]) 82 post_data = json.loads(self.rfile.read(content_length)) 83 84 # new session 85 if self.path == "/session": 86 uuid = str(uuid4()) 87 _SESSIONS[uuid] = Session(uuid) 88 return self._send_response(data={"sessionId": uuid}) 89 90 session, action = self._parse_path() 91 if action == ["url"]: 92 session.visit(post_data["url"]) 93 return self._send_response() 94 95 if action == ["window", "new"]: 96 if session is None: 97 return self._send_response(404) 98 handle = session.new_window() 99 return self._send_response(data={"handle": handle, "type": "tab"}) 100 101 if action == ["timeouts"]: 102 return self._send_response() 103 104 if action == ["execute", "async"]: 105 return self._send_response(data={"logs": []}) 106 107 # other commands not supported yet, we just return 200s 108 return self._send_response() 109 110 def do_DELETE(self): 111 return self._send_response() 112 session, action = self._parse_path() 113 if session is not None: 114 del _SESSIONS[session.session_id] 115 return self._send_response() 116 return self._send_response(status=404) 117 118 119 VERSION = """\ 120 geckodriver 0.24.0 ( 2019-01-28) 121 122 The source code of this program is available from 123 testing/geckodriver in https://hg.mozilla.org/mozilla-central. 124 125 This program is subject to the terms of the Mozilla Public License 2.0. 126 You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\ 127 """ 128 129 130 if __name__ == "__main__": 131 parser = argparse.ArgumentParser(description="FakeGeckodriver") 132 parser.add_argument("--log", type=str, default=None) 133 parser.add_argument("--port", type=int, default=4444) 134 parser.add_argument("--marionette-port", type=int, default=2828) 135 parser.add_argument("--version", action="store_true", default=False) 136 parser.add_argument("--verbose", "-v", action="count") 137 args = parser.parse_args() 138 139 if args.version: 140 print(VERSION) 141 else: 142 HTTPServer.allow_reuse_address = True 143 server = HTTPServer(("127.0.0.1", args.port), RequestHandler) 144 server.serve_forever()