tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

helpers.py (10058B)


      1 import argparse
      2 import json
      3 import os
      4 import re
      5 import socket
      6 import subprocess
      7 import sys
      8 import tempfile
      9 import threading
     10 import time
     11 from contextlib import suppress
     12 from copy import deepcopy
     13 from urllib.parse import urlparse
     14 
     15 import webdriver
     16 from mozprofile import Preferences, Profile
     17 from mozrunner import FirefoxRunner
     18 
     19 from .context import using_context
     20 
     21 
     22 class Browser:
     23    def __init__(
     24        self,
     25        binary,
     26        profile,
     27        env=None,
     28        extra_args=None,
     29        extra_prefs=None,
     30        log_level=None,
     31        truncate_enabled=True,
     32        use_bidi=False,
     33        use_marionette=False,
     34    ):
     35        self.profile = profile
     36 
     37        self.extra_args = extra_args
     38        self.extra_prefs = extra_prefs
     39        self.log_level = log_level
     40        self.truncate_enabled = truncate_enabled
     41        self.use_bidi = use_bidi
     42        self.use_marionette = use_marionette
     43 
     44        self.bidi_port_file = None
     45        self.remote_agent_host = None
     46        self.remote_agent_port = None
     47 
     48        cmdargs = ["-no-remote"]
     49 
     50        if use_bidi:
     51            self.webdriver_bidi_file = os.path.join(
     52                self.profile.profile, "WebDriverBiDiServer.json"
     53            )
     54            with suppress(FileNotFoundError):
     55                os.remove(self.webdriver_bidi_file)
     56 
     57        if use_marionette:
     58            cmdargs.extend(["-marionette"])
     59 
     60        # Avoid modifying extra_prefs to prevent side-effects with the "browser" fixture,
     61        # which checks session equality and would create a new session each time.
     62        prefs = self.extra_prefs or {}
     63 
     64        if log_level is not None:
     65            prefs.update({"remote.log.level": log_level})
     66 
     67        if truncate_enabled is False:
     68            prefs.update({"remote.log.truncate": False})
     69 
     70        self.profile.set_preferences(prefs)
     71 
     72        if self.use_bidi:
     73            cmdargs.extend(["--remote-debugging-port", "0"])
     74        if self.extra_args is not None:
     75            cmdargs.extend(self.extra_args)
     76 
     77        print(f"Run command: {binary} {cmdargs}")
     78        self.runner = FirefoxRunner(
     79            binary=binary, profile=self.profile, cmdargs=cmdargs, env=env
     80        )
     81 
     82    @property
     83    def websocket_url(self):
     84        if self.remote_agent_host is None or self.remote_agent_port is None:
     85            raise Exception("No WebSocket server running")
     86 
     87        return f"ws://{self.remote_agent_host}:{self.remote_agent_port}"
     88 
     89    @property
     90    def is_running(self):
     91        return self.runner.is_running()
     92 
     93    def start(self):
     94        # Start Firefox.
     95        self.runner.start()
     96 
     97        if self.use_bidi:
     98            # Wait until the WebDriverBiDiServer.json file is ready
     99            while not os.path.exists(self.webdriver_bidi_file):
    100                time.sleep(0.1)
    101 
    102            # Read the connection details from file
    103            data = json.loads(open(self.webdriver_bidi_file).read())
    104            self.remote_agent_host = data["ws_host"]
    105            self.remote_agent_port = int(data["ws_port"])
    106 
    107    def quit(self, clean_profile=True):
    108        if self.is_running:
    109            self.runner.stop()
    110            self.runner.cleanup()
    111 
    112        if clean_profile:
    113            self.profile.cleanup()
    114 
    115    def wait(self):
    116        if self.is_running is True:
    117            self.runner.wait()
    118 
    119 
    120 class Geckodriver:
    121    PORT_RE = re.compile(rb".*Listening on [^ :]*:(\d+)")
    122 
    123    def __init__(self, configuration, hostname=None, extra_args=None, extra_env=None):
    124        self.config = configuration["webdriver"]
    125        self.requested_capabilities = configuration["capabilities"]
    126        self.hostname = hostname or configuration["host"]
    127        self.extra_args = extra_args or []
    128        self.env = configuration["browser"]["env"]
    129        self.extra_env = extra_env or {}
    130 
    131        self.command = None
    132        self.proc = None
    133        self.port = None
    134        self.reader_thread = None
    135 
    136        self.capabilities = {"alwaysMatch": self.requested_capabilities}
    137        self.session = None
    138 
    139    @property
    140    def remote_agent_port(self):
    141        webSocketUrl = self.session.capabilities.get("webSocketUrl")
    142        assert webSocketUrl is not None
    143 
    144        return urlparse(webSocketUrl).port
    145 
    146    def start(self):
    147        self.command = (
    148            [self.config["binary"], "--port", "0"]
    149            + self.config["args"]
    150            + self.extra_args
    151        )
    152 
    153        print(f"Running command: {' '.join(self.command)}")
    154        all_env = deepcopy(self.env)
    155        all_env.update(self.extra_env)
    156 
    157        self.proc = subprocess.Popen(self.command, env=all_env, stdout=subprocess.PIPE)
    158 
    159        self.reader_thread = threading.Thread(
    160            target=readOutputLine,
    161            args=(self.proc.stdout, self.processOutputLine),
    162            daemon=True,
    163        )
    164        self.reader_thread.start()
    165        # Wait for the port to become ready
    166        end_time = time.time() + 10
    167        while time.time() < end_time:
    168            returncode = self.proc.poll()
    169            if returncode is not None:
    170                raise ChildProcessError(
    171                    f"geckodriver terminated with code {returncode}"
    172                )
    173            if self.port is not None:
    174                with socket.socket() as sock:
    175                    if sock.connect_ex((self.hostname, self.port)) == 0:
    176                        break
    177            else:
    178                time.sleep(0.1)
    179        else:
    180            if self.port is None:
    181                raise OSError(
    182                    f"Failed to read geckodriver port started on {self.hostname}"
    183                )
    184            raise ConnectionRefusedError(
    185                f"Failed to connect to geckodriver on {self.hostname}:{self.port}"
    186            )
    187 
    188        self.session = webdriver.Session(
    189            self.hostname, self.port, capabilities=self.capabilities
    190        )
    191 
    192        return self
    193 
    194    def processOutputLine(self, line):
    195        sys.stdout.write(line)
    196 
    197        if self.port is None:
    198            m = self.PORT_RE.match(line)
    199            if m is not None:
    200                self.port = int(m.groups()[0])
    201 
    202    async def stop(self):
    203        if self.session is not None:
    204            await self.delete_session()
    205        if self.proc:
    206            self.proc.kill()
    207        self.port = None
    208        if self.reader_thread is not None:
    209            self.reader_thread.join()
    210 
    211    def new_session(self):
    212        self.session.start()
    213 
    214    async def delete_session(self):
    215        if self.session.bidi_session is not None:
    216            await self.session.bidi_session.end()
    217 
    218        self.session.end()
    219 
    220 
    221 def clear_pref(session, pref):
    222    """Clear the user-defined value from the specified preference.
    223 
    224    :param pref: Name of the preference.
    225    """
    226    with using_context(session, "chrome"):
    227        session.execute_script(
    228            """
    229           const { Preferences } = ChromeUtils.importESModule(
    230             "resource://gre/modules/Preferences.sys.mjs"
    231           );
    232           Preferences.reset(arguments[0]);
    233           """,
    234            args=(pref,),
    235        )
    236 
    237 
    238 def create_custom_profile(base_profile, default_preferences, clone=True):
    239    if clone:
    240        # Clone the current profile and remove the prefs.js file to only
    241        # keep default preferences as set in user.js.
    242        profile = Profile.clone(base_profile)
    243        prefs_path = os.path.join(profile.profile, "prefs.js")
    244        if os.path.exists(prefs_path):
    245            os.remove(prefs_path)
    246    else:
    247        profile = Profile(tempfile.mkdtemp(prefix="wdspec-"))
    248        profile.set_preferences(default_preferences)
    249 
    250    return profile
    251 
    252 
    253 def get_arg_value(arg_names, args):
    254    """Get an argument value from a list of arguments
    255 
    256    This assumes that argparse argument parsing is close enough to the target
    257    to be compatible, at least with the set of inputs we have.
    258 
    259    :param arg_names: - List of names for the argument e.g. ["--foo", "-f"]
    260    :param args: - List of arguments to parse
    261    :returns: - Optional string argument value
    262    """
    263    parser = argparse.ArgumentParser()
    264    parser.add_argument(*arg_names, action="store", dest="value", default=None)
    265    parsed, _ = parser.parse_known_args(args)
    266    return parsed.value
    267 
    268 
    269 def get_pref(session, pref):
    270    """Get the value of the specified preference.
    271 
    272    :param pref: Name of the preference.
    273    """
    274    with using_context(session, "chrome"):
    275        pref_value = session.execute_script(
    276            """
    277            const { Preferences } = ChromeUtils.importESModule(
    278              "resource://gre/modules/Preferences.sys.mjs"
    279            );
    280 
    281            let pref = arguments[0];
    282 
    283            prefs = new Preferences();
    284            return prefs.get(pref, null);
    285            """,
    286            args=(pref,),
    287        )
    288        return pref_value
    289 
    290 
    291 def get_profile_folder(firefox_options):
    292    return get_arg_value(["--profile"], firefox_options["args"])
    293 
    294 
    295 def readOutputLine(stream, callback):
    296    while True:
    297        line = stream.readline()
    298        if not line:
    299            break
    300 
    301        callback(line)
    302 
    303 
    304 def read_user_preferences(profile_path, filename="user.js"):
    305    prefs_file = os.path.join(profile_path, filename)
    306 
    307    prefs = {}
    308    for pref_name, pref_value in Preferences().read_prefs(prefs_file):
    309        prefs[pref_name] = pref_value
    310 
    311    return prefs
    312 
    313 
    314 def set_pref(session, pref, value):
    315    """Set the value of the specified preference.
    316 
    317    :param pref: Name of the preference.
    318    :param value: The value to set the preference to. If the value is None,
    319                  reset the preference to its default value. If no default
    320                  value exists, the preference will cease to exist.
    321    """
    322    if value is None:
    323        clear_pref(session, pref)
    324        return
    325 
    326    with using_context(session, "chrome"):
    327        session.execute_script(
    328            """
    329            const { Preferences } = ChromeUtils.importESModule(
    330              "resource://gre/modules/Preferences.sys.mjs"
    331            );
    332 
    333            const [pref, value] = arguments;
    334 
    335            prefs = new Preferences();
    336            prefs.set(pref, value);
    337            """,
    338            args=(pref, value),
    339        )