tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

mach_commands.py (15009B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 import argparse
      6 import io
      7 import os
      8 import platform
      9 import sys
     10 
     11 from mach.decorators import Command
     12 from mozbuild.base import BuildEnvironmentNotFoundException, MozbuildObject
     13 
     14 here = os.path.abspath(os.path.dirname(__file__))
     15 
     16 GVE = "org.mozilla.geckoview_example"
     17 
     18 
     19 def get(url):
     20    import requests
     21 
     22    resp = requests.get(url)
     23    resp.raise_for_status()
     24    return resp
     25 
     26 
     27 def untar(fileobj, dest):
     28    import tarfile
     29 
     30    with tarfile.open(fileobj=fileobj, mode="r") as tar_data:
     31        tar_data.extractall(path=dest)
     32 
     33 
     34 def unzip(fileobj, dest):
     35    import zipfile
     36 
     37    with zipfile.ZipFile(fileobj) as zip_data:
     38        zip_data.extractall(path=dest)
     39 
     40 
     41 def writable_dir(path):
     42    if not os.path.isdir(path):
     43        raise argparse.ArgumentTypeError(f"{path} is not a valid dir")
     44    if os.access(path, os.W_OK):
     45        return path
     46    else:
     47        raise argparse.ArgumentTypeError(f"{path} is not a writable dir")
     48 
     49 
     50 def create_parser_interventions():
     51    from mozlog import commandline
     52 
     53    parser = argparse.ArgumentParser()
     54    parser.add_argument("--webdriver-binary", help="Path to webdriver binary")
     55    parser.add_argument(
     56        "--webdriver-port",
     57        action="store",
     58        default="4444",
     59        help="Port on which to run WebDriver",
     60    )
     61    parser.add_argument(
     62        "--webdriver-ws-port",
     63        action="store",
     64        default="9222",
     65        help="Port on which to run WebDriver BiDi websocket",
     66    )
     67    parser.add_argument("-b", "--bugs", nargs="*", help="Bugs to run tests for")
     68    parser.add_argument(
     69        "--do2fa",
     70        action="store_true",
     71        default=False,
     72        help="Do two-factor auth live in supporting tests",
     73    )
     74    parser.add_argument(
     75        "--config", help="Path to JSON file containing logins and other settings"
     76    )
     77    parser.add_argument(
     78        "--debug", action="store_true", default=False, help="Debug failing tests"
     79    )
     80    parser.add_argument(
     81        "-H",
     82        "--headless",
     83        action="store_true",
     84        default=False,
     85        help="Run firefox in headless mode",
     86    )
     87    parser.add_argument(
     88        "--interventions",
     89        action="store",
     90        default="both",
     91        choices=["enabled", "disabled", "both", "none"],
     92        help="Enable webcompat interventions",
     93    )
     94    parser.add_argument(
     95        "--shims",
     96        action="store",
     97        default="none",
     98        choices=["enabled", "disabled", "both", "none"],
     99        help="Enable SmartBlock shims",
    100    )
    101    parser.add_argument(
    102        "--platform",
    103        action="store",
    104        choices=["android", "desktop"],
    105        help="Platform to target",
    106    )
    107    parser.add_argument(
    108        "--failure-screenshots-dir",
    109        action="store",
    110        type=writable_dir,
    111        help="Path to save failure screenshots",
    112    )
    113    parser.add_argument(
    114        "-s",
    115        "--no-failure-screenshots",
    116        action="store_true",
    117        default=False,
    118        help="Do not save a screenshot for each test failure",
    119    )
    120    parser.add_argument(
    121        "-P",
    122        "--platform-override",
    123        action="store",
    124        choices=["android", "linux", "mac", "windows"],
    125        help="Override key navigator properties to match the given platform and/or use responsive design mode to mimic the given platform",
    126    )
    127 
    128    desktop_group = parser.add_argument_group("Desktop-specific arguments")
    129    desktop_group.add_argument("--binary", help="Path to browser binary")
    130 
    131    android_group = parser.add_argument_group("Android-specific arguments")
    132    android_group.add_argument(
    133        "--device-serial",
    134        action="store",
    135        help="Running Android instances to connect to, if not emulator-5554",
    136    )
    137    android_group.add_argument(
    138        "--package-name",
    139        action="store",
    140        default=GVE,
    141        help="Android package name to use",
    142    )
    143 
    144    commandline.add_logging_group(parser)
    145    return parser
    146 
    147 
    148 class InterventionTest(MozbuildObject):
    149    def set_default_kwargs(self, logger, command_context, kwargs):
    150        platform = kwargs["platform"]
    151        binary = kwargs["binary"]
    152        device_serial = kwargs["device_serial"]
    153        try:
    154            is_gve_build = command_context.substs.get("MOZ_APP_NAME") == "fennec"
    155        except BuildEnvironmentNotFoundException:
    156            # If we don't have a build, just use the logic below to choose between
    157            # desktop and Android
    158            is_gve_build = False
    159 
    160        if platform == "android" or (
    161            platform is None and binary is None and (device_serial or is_gve_build)
    162        ):
    163            kwargs["platform"] = "android"
    164        else:
    165            kwargs["platform"] = "desktop"
    166 
    167        if kwargs["platform"] == "desktop" and kwargs["binary"] is None:
    168            kwargs["binary"] = self.get_binary_path()
    169 
    170        if kwargs["webdriver_binary"] is None:
    171            webdriver_binary = self.get_binary_path(
    172                "geckodriver", validate_exists=False
    173            )
    174 
    175            if not os.path.exists(webdriver_binary):
    176                webdriver_binary = self.install_geckodriver(
    177                    logger, dest=os.path.dirname(webdriver_binary)
    178                )
    179 
    180            if not os.path.exists(webdriver_binary):
    181                logger.error("Can't find geckodriver")
    182                sys.exit(1)
    183            kwargs["webdriver_binary"] = webdriver_binary
    184 
    185    def platform_string_geckodriver(self):
    186        uname = platform.uname()
    187        platform_name = {"Linux": "linux", "Windows": "win", "Darwin": "macos"}.get(
    188            uname[0]
    189        )
    190 
    191        if platform_name in ("linux", "win"):
    192            bits = "64" if uname[4] == "x86_64" else "32"
    193        elif platform_name == "macos":
    194            bits = ""
    195        else:
    196            raise ValueError(f"No precompiled geckodriver for platform {uname}")
    197 
    198        return f"{platform_name}{bits}"
    199 
    200    def install_geckodriver(self, logger, dest):
    201        """Install latest Geckodriver."""
    202        if dest is None:
    203            dest = os.path.join(self.distdir, "dist", "bin")
    204 
    205        is_windows = platform.uname()[0] == "Windows"
    206 
    207        release = get(
    208            "https://api.github.com/repos/mozilla/geckodriver/releases/latest"
    209        ).json()
    210        ext = "zip" if is_windows else "tar.gz"
    211        platform_name = self.platform_string_geckodriver()
    212        name_suffix = f"-{platform_name}.{ext}"
    213        for item in release["assets"]:
    214            if item["name"].endswith(name_suffix):
    215                url = item["browser_download_url"]
    216                break
    217        else:
    218            raise ValueError(f"Failed to find geckodriver for platform {platform_name}")
    219 
    220        logger.info(f"Installing geckodriver from {url}")
    221 
    222        data = io.BytesIO(get(url).content)
    223        data.seek(0)
    224        decompress = unzip if ext == "zip" else untar
    225        decompress(data, dest=dest)
    226 
    227        exe_ext = ".exe" if is_windows else ""
    228        path = os.path.join(dest, f"geckodriver{exe_ext}")
    229 
    230        return path
    231 
    232    def setup_device(self, command_context, kwargs):
    233        if kwargs["platform"] != "android":
    234            return
    235 
    236        app = kwargs["package_name"]
    237        device_serial = kwargs["device_serial"]
    238 
    239        if not device_serial:
    240            from mozrunner.devices.android_device import (
    241                InstallIntent,
    242                verify_android_device,
    243            )
    244 
    245            # verify_android_device sets up device/emulator and records selected
    246            # one to DEVICE_SERIAL environment.
    247            verify_android_device(
    248                command_context, app=app, network=True, install=InstallIntent.YES
    249            )
    250            kwargs["device_serial"] = os.environ.get("DEVICE_SERIAL")
    251 
    252        # GVE does not have the webcompat addon by default. Add it.
    253        if app == GVE:
    254            kwargs["addon"] = "/data/local/tmp/webcompat.xpi"
    255            push_to_device(
    256                command_context.substs["ADB"],
    257                device_serial,
    258                webcompat_addon(command_context),
    259                kwargs["addon"],
    260            )
    261 
    262    def run(self, command_context, **kwargs):
    263        import mozlog
    264        import runner
    265 
    266        mozlog.commandline.setup_logging(
    267            "test-interventions", kwargs, {"mach": sys.stdout}
    268        )
    269        logger = mozlog.get_default_logger("test-interventions")
    270 
    271        log_level = "INFO"
    272        # It's not trivial to get a single log level out of mozlog, because we might have
    273        # different levels going to different outputs. We look for the maximum (i.e. most
    274        # verbose) level of any handler with an attached formatter.
    275        configured_level_number = None
    276        for handler in logger.handlers:
    277            if hasattr(handler, "formatter") and hasattr(handler.formatter, "level"):
    278                formatter_level = handler.formatter.level
    279                configured_level_number = (
    280                    formatter_level
    281                    if configured_level_number is None
    282                    else max(configured_level_number, formatter_level)
    283                )
    284        if configured_level_number is not None:
    285            for level, number in mozlog.structuredlog.log_levels.items():
    286                if number == configured_level_number:
    287                    log_level = level
    288                    break
    289 
    290        status_handler = mozlog.handlers.StatusHandler()
    291        logger.add_handler(status_handler)
    292 
    293        self.set_default_kwargs(logger, command_context, kwargs)
    294 
    295        self.setup_device(command_context, kwargs)
    296 
    297        if kwargs["interventions"] != "none":
    298            interventions = (
    299                ["enabled", "disabled"]
    300                if kwargs["interventions"] == "both"
    301                else [kwargs["interventions"]]
    302            )
    303 
    304            for interventions_setting in interventions:
    305                runner.run(
    306                    logger,
    307                    os.path.join(here, "interventions"),
    308                    kwargs["webdriver_binary"],
    309                    kwargs["webdriver_port"],
    310                    kwargs["webdriver_ws_port"],
    311                    browser_binary=kwargs.get("binary"),
    312                    device_serial=kwargs.get("device_serial"),
    313                    package_name=kwargs.get("package_name"),
    314                    addon=kwargs.get("addon"),
    315                    bugs=kwargs["bugs"],
    316                    debug=kwargs["debug"],
    317                    interventions=interventions_setting,
    318                    config=kwargs["config"],
    319                    headless=kwargs["headless"],
    320                    do2fa=kwargs["do2fa"],
    321                    log_level=log_level,
    322                    failure_screenshots_dir=kwargs.get("failure_screenshots_dir"),
    323                    no_failure_screenshots=kwargs.get("no_failure_screenshots"),
    324                    platform_override=kwargs.get("platform_override"),
    325                )
    326 
    327        if kwargs["shims"] != "none":
    328            shims = (
    329                ["enabled", "disabled"]
    330                if kwargs["shims"] == "both"
    331                else [kwargs["shims"]]
    332            )
    333 
    334            for shims_setting in shims:
    335                runner.run(
    336                    logger,
    337                    os.path.join(here, "shims"),
    338                    kwargs["webdriver_binary"],
    339                    kwargs["webdriver_port"],
    340                    kwargs["webdriver_ws_port"],
    341                    browser_binary=kwargs.get("binary"),
    342                    device_serial=kwargs.get("device_serial"),
    343                    package_name=kwargs.get("package_name"),
    344                    addon=kwargs.get("addon"),
    345                    bugs=kwargs["bugs"],
    346                    debug=kwargs["debug"],
    347                    shims=shims_setting,
    348                    config=kwargs["config"],
    349                    headless=kwargs["headless"],
    350                    do2fa=kwargs["do2fa"],
    351                    failure_screenshots_dir=kwargs.get("failure_screenshots_dir"),
    352                    no_failure_screenshots=kwargs.get("no_failure_screenshots"),
    353                    platform_override=kwargs.get("platform_override"),
    354                )
    355 
    356        summary = status_handler.summarize()
    357        passed = (
    358            summary.unexpected_statuses == 0
    359            and summary.log_level_counts.get("ERROR", 0) == 0
    360            and summary.log_level_counts.get("CRITICAL", 0) == 0
    361        )
    362        return passed
    363 
    364 
    365 def webcompat_addon(command_context):
    366    import shutil
    367    import tempfile
    368 
    369    src = os.path.join(command_context.topsrcdir, "browser", "extensions", "webcompat")
    370 
    371    # We use #include directives in the system addon's moz.build (to inject our JSON config
    372    # into interventions.js), so we must do that here to make a working XPI.
    373    tmpdir_kwargs = {}
    374    if sys.version_info.major >= 3 and sys.version_info.minor >= 10:
    375        tmpdir_kwargs["ignore_cleanup_errors"] = True
    376    with tempfile.TemporaryDirectory(**tmpdir_kwargs) as src_copy:
    377 
    378        def process_includes(path):
    379            fullpath = os.path.join(src_copy, path)
    380            in_lines = None
    381            with open(fullpath) as f:
    382                in_lines = f.readlines()
    383            with open(fullpath, "w") as f:
    384                for line in in_lines:
    385                    if not line.startswith("#include"):
    386                        f.write(line)
    387                        continue
    388                    include_path = line.split()[1]
    389                    include_fullpath = os.path.join(
    390                        os.path.dirname(fullpath), include_path
    391                    )
    392                    with open(include_fullpath) as inc:
    393                        f.write(inc.read())
    394                    f.write("\n")
    395 
    396        shutil.copytree(src, src_copy, dirs_exist_ok=True)
    397        process_includes("run.js")
    398 
    399        dst = os.path.join(
    400            command_context.virtualenv_manager.virtualenv_root, "webcompat.xpi"
    401        )
    402        shutil.make_archive(dst, "zip", src_copy)
    403        shutil.move(f"{dst}.zip", dst)
    404        return dst
    405 
    406 
    407 def push_to_device(adb_path, device_serial, local_path, remote_path):
    408    from mozdevice import ADBDeviceFactory
    409 
    410    device = ADBDeviceFactory(adb=adb_path, device=device_serial)
    411    device.push(local_path, remote_path)
    412    device.chmod(remote_path)
    413 
    414 
    415 @Command(
    416    "test-interventions",
    417    category="testing",
    418    description="Test the webcompat interventions",
    419    parser=create_parser_interventions,
    420    virtualenv_name="webcompat",
    421 )
    422 def test_interventions(command_context, **params):
    423    here = os.path.abspath(os.path.dirname(__file__))
    424    command_context.virtualenv_manager.activate()
    425    command_context.virtualenv_manager.install_pip_requirements(
    426        os.path.join(here, "requirements.txt"),
    427        require_hashes=False,
    428    )
    429 
    430    intervention_test = command_context._spawn(InterventionTest)
    431    return 0 if intervention_test.run(command_context, **params) else 1