tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

fakegeckodriver.py (4431B)


      1 #!/usr/bin/env python3
      2 import argparse
      3 import json
      4 from http.server import BaseHTTPRequestHandler, HTTPServer
      5 from uuid import uuid4
      6 
      7 _SESSIONS = {}
      8 
      9 
     10 class Window:
     11    def __init__(self, handle, title="about:blank"):
     12        self.handle = handle
     13        self.title = title
     14 
     15    def visit_url(self, url):
     16        print("Visiting %s" % url)
     17        # XXX todo, load the URL for real
     18        self.url = url
     19 
     20 
     21 class Session:
     22    def __init__(self, uuid):
     23        self.session_id = uuid
     24        self.autoinc = 0
     25        self.windows = {}
     26        self.active_handle = self.new_window()
     27 
     28    def visit(self, url):
     29        self.windows[self.active_handle].visit_url(url)
     30 
     31    def new_window(self):
     32        w = Window(self.autoinc)
     33        self.windows[w.handle] = w
     34        self.autoinc += 1
     35        return w.handle
     36 
     37 
     38 class RequestHandler(BaseHTTPRequestHandler):
     39    def _set_headers(self, status=200):
     40        self.send_response(status)
     41        self.send_header("Content-type", "application/json")
     42        self.end_headers()
     43 
     44    def _send_response(self, status=200, data=None):
     45        if data is None:
     46            data = {}
     47        data = json.dumps(data).encode("utf8")
     48        self._set_headers(status)
     49        self.wfile.write(data)
     50 
     51    def _parse_path(self):
     52        path = self.path.lstrip("/")
     53        sections = path.split("/")
     54        session = None
     55        action = []
     56        if len(sections) > 1:
     57            session_id = sections[1]
     58            if session_id in _SESSIONS:
     59                session = _SESSIONS[session_id]
     60                action = sections[2:]
     61        return session, action
     62 
     63    def do_GET(self):
     64        print("GET " + self.path)
     65        if self.path == "/status":
     66            return self._send_response(data={"ready": "OK"})
     67 
     68        session, action = self._parse_path()
     69        if action == ["window", "handles"]:
     70            data = {"value": list(session.windows.keys())}
     71            return self._send_response(data=data)
     72 
     73        if action == ["moz", "context"]:
     74            data = {"value": "chrome"}
     75            return self._send_response(data=data)
     76 
     77        return self._send_response(status=404)
     78 
     79    def do_POST(self):
     80        print("POST " + self.path)
     81        content_length = int(self.headers["Content-Length"])
     82        post_data = json.loads(self.rfile.read(content_length))
     83 
     84        # new session
     85        if self.path == "/session":
     86            uuid = str(uuid4())
     87            _SESSIONS[uuid] = Session(uuid)
     88            return self._send_response(data={"sessionId": uuid})
     89 
     90        session, action = self._parse_path()
     91        if action == ["url"]:
     92            session.visit(post_data["url"])
     93            return self._send_response()
     94 
     95        if action == ["window", "new"]:
     96            if session is None:
     97                return self._send_response(404)
     98            handle = session.new_window()
     99            return self._send_response(data={"handle": handle, "type": "tab"})
    100 
    101        if action == ["timeouts"]:
    102            return self._send_response()
    103 
    104        if action == ["execute", "async"]:
    105            return self._send_response(data={"logs": []})
    106 
    107        # other commands not supported yet, we just return 200s
    108        return self._send_response()
    109 
    110    def do_DELETE(self):
    111        return self._send_response()
    112        session, action = self._parse_path()
    113        if session is not None:
    114            del _SESSIONS[session.session_id]
    115            return self._send_response()
    116        return self._send_response(status=404)
    117 
    118 
    119 VERSION = """\
    120 geckodriver 0.24.0 ( 2019-01-28)
    121 
    122 The source code of this program is available from
    123 testing/geckodriver in https://hg.mozilla.org/mozilla-central.
    124 
    125 This program is subject to the terms of the Mozilla Public License 2.0.
    126 You can obtain a copy of the license at https://mozilla.org/MPL/2.0/.\
    127 """
    128 
    129 
    130 if __name__ == "__main__":
    131    parser = argparse.ArgumentParser(description="FakeGeckodriver")
    132    parser.add_argument("--log", type=str, default=None)
    133    parser.add_argument("--port", type=int, default=4444)
    134    parser.add_argument("--marionette-port", type=int, default=2828)
    135    parser.add_argument("--version", action="store_true", default=False)
    136    parser.add_argument("--verbose", "-v", action="count")
    137    args = parser.parse_args()
    138 
    139    if args.version:
    140        print(VERSION)
    141    else:
    142        HTTPServer.allow_reuse_address = True
    143        server = HTTPServer(("127.0.0.1", args.port), RequestHandler)
    144        server.serve_forever()