tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

browser.py (111629B)


      1 # mypy: allow-untyped-defs
      2 import configparser
      3 import json
      4 import os
      5 import platform
      6 import re
      7 import shutil
      8 import stat
      9 import subprocess
     10 import sys
     11 import tempfile
     12 from abc import ABCMeta, abstractmethod
     13 from datetime import datetime, timedelta, timezone
     14 from shutil import which
     15 from typing import Any, Dict, List, Optional, Tuple
     16 from urllib.parse import urlsplit, quote
     17 
     18 import html5lib
     19 import requests
     20 from packaging.specifiers import SpecifierSet
     21 
     22 from .utils import (
     23    call,
     24    get,
     25    get_download_to_descriptor,
     26    rmtree,
     27    sha256sum,
     28    untar,
     29    unzip,
     30 )
     31 from .wpt import venv_dir
     32 
     33 uname = platform.uname()
     34 
     35 # The root URL for Chrome for Testing API endpoints.
     36 CHROME_FOR_TESTING_ROOT_URL = "https://googlechromelabs.github.io/chrome-for-testing/"
     37 # File name containing a matching ChromeDriver download URL for a specific Chrome download.
     38 CHROMEDRIVER_SAVED_DOWNLOAD_FILE = "matching_chromedriver_url.txt"
     39 
     40 # the rootUrl for the firefox-ci deployment of Taskcluster
     41 FIREFOX_CI_ROOT_URL = 'https://firefox-ci-tc.services.mozilla.com'
     42 
     43 
     44 def _get_fileversion(binary, logger=None):
     45    command = "(Get-Item -ErrorAction Stop '%s').VersionInfo.FileVersion" % binary.replace("'", "''")
     46    try:
     47        return call("powershell.exe", command).strip()
     48    except (subprocess.CalledProcessError, OSError):
     49        if logger is not None:
     50            logger.warning("Failed to call %s in PowerShell" % command)
     51        return None
     52 
     53 
     54 def get_ext(filename):
     55    """Get the extension from a filename with special handling for .tar.foo"""
     56    name, ext = os.path.splitext(filename)
     57    if name.endswith(".tar"):
     58        ext = ".tar%s" % ext
     59    return ext
     60 
     61 
     62 def get_download_filename(resp, default=None):
     63    """Get the filename from a requests.Response, or default"""
     64    filename = None
     65 
     66    content_disposition = resp.headers.get("content-disposition")
     67    if content_disposition:
     68        filenames = re.findall("filename=(.+)", content_disposition)
     69        if filenames:
     70            filename = filenames[0]
     71 
     72    if not filename:
     73        filename = urlsplit(resp.url).path.rsplit("/", 1)[1]
     74 
     75    return filename or default
     76 
     77 
     78 def get_taskcluster_artifact(index, path):
     79    TC_INDEX_BASE = FIREFOX_CI_ROOT_URL + "/api/index/v1/"
     80 
     81    resp = get(TC_INDEX_BASE + "task/%s/artifacts/%s" % (index, path))
     82    resp.raise_for_status()
     83 
     84    return resp
     85 
     86 
     87 def get_file_github(repo: str, ref: str, path: str) -> bytes:
     88    data: bytes = get(f"https://raw.githubusercontent.com/{repo}/{ref}/{path}").content  # type: ignore
     89    return data
     90 
     91 
     92 class Browser(metaclass=ABCMeta):
     93    def __init__(self, logger):
     94        self.logger = logger
     95 
     96    def _get_browser_download_dir(self, dest, channel):
     97        if dest is None:
     98            return self._get_browser_binary_dir(dest, channel)
     99 
    100        return dest
    101 
    102    def _get_browser_binary_dir(self, dest, channel):
    103        if dest is None:
    104            # os.getcwd() doesn't include the venv path
    105            dest = os.path.join(os.getcwd(), venv_dir())
    106 
    107        dest = os.path.join(dest, "browsers", channel)
    108 
    109        if not os.path.exists(dest):
    110            os.makedirs(dest)
    111 
    112        return dest
    113 
    114    def download_from_url(
    115        self, url, dest=None, channel=None, rename=None, default_name="download"
    116    ):
    117        """Download a URL into a dest/channel
    118        :param url: The URL to download
    119        :param dest: Directory in which to put the dowloaded
    120        :param channel: Browser channel to append to the dest
    121        :param rename: Optional name for the download; the original extension
    122                       is preserved
    123        :param default_name: The default name for the download if none is
    124                             provided and none can be found from the network
    125        :return: The path to the downloaded package/installer
    126        """
    127        self.logger.info("Downloading from %s" % url)
    128 
    129        dest = self._get_browser_download_dir(dest, channel)
    130 
    131        resp = get(url)
    132        filename = get_download_filename(resp, default_name)
    133        if rename:
    134            filename = "%s%s" % (rename, get_ext(filename))
    135 
    136        output_path = os.path.join(dest, filename)
    137 
    138        with open(output_path, "wb") as f:
    139            for chunk in resp.iter_content(chunk_size=64 * 1024):
    140                f.write(chunk)
    141 
    142        return output_path
    143 
    144    @abstractmethod
    145    def download(self, dest=None, channel=None, rename=None):
    146        """Download a package or installer for the browser
    147        :param dest: Directory in which to put the dowloaded package
    148        :param channel: Browser channel to download
    149        :param rename: Optional name for the downloaded package; the original
    150                       extension is preserved.
    151        :return: The path to the downloaded package/installer
    152        """
    153        return NotImplemented
    154 
    155    @abstractmethod
    156    def install(self, dest=None, channel=None):
    157        """Download and install the browser.
    158 
    159        This method usually calls download().
    160 
    161        :param dest: Directory in which to install the browser
    162        :param channel: Browser channel to install
    163        :return: The path to the installed browser
    164        """
    165        return NotImplemented
    166 
    167    @abstractmethod
    168    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
    169        """Download and install the WebDriver implementation for this browser.
    170 
    171        :param dest: Directory in which to install the WebDriver
    172        :param channel: Browser channel to install
    173        :param browser_binary: The path to the browser binary
    174        :return: The path to the installed WebDriver
    175        """
    176        return NotImplemented
    177 
    178    @abstractmethod
    179    def find_binary(self, venv_path=None, channel=None):
    180        """Find the binary of the browser.
    181 
    182        If the WebDriver for the browser is able to find the binary itself, this
    183        method doesn't need to be implemented, in which case NotImplementedError
    184        is suggested to be raised to prevent accidental use.
    185        """
    186        return NotImplemented
    187 
    188    @abstractmethod
    189    def find_webdriver(self, venv_path=None, channel=None):
    190        """Find the binary of the WebDriver."""
    191        return NotImplemented
    192 
    193    @abstractmethod
    194    def version(self, binary=None, webdriver_binary=None):
    195        """Retrieve the release version of the installed browser."""
    196        return NotImplemented
    197 
    198    @abstractmethod
    199    def requirements(self):
    200        """Name of the browser-specific wptrunner requirements file"""
    201        return NotImplemented
    202 
    203 
    204 class FirefoxPrefs:
    205    def __init__(self, logger):
    206        self.logger = logger
    207 
    208    def install_prefs(self, binary: Optional[str], dest: Optional[str] = None, channel: Optional[str] = None) -> str:
    209        if binary and not binary.endswith(".apk"):
    210            version, channel_, rev = self.get_version_and_channel(binary)
    211            if channel is not None and channel != channel_:
    212                # Beta doesn't always seem to have the b in the version string, so allow the
    213                # manually supplied value to override the one from the binary
    214                self.logger.warning("Supplied channel doesn't match binary, using supplied channel")
    215            elif channel is None:
    216                channel = channel_
    217        else:
    218            rev = None
    219            version = None
    220 
    221        if channel is None:
    222            self.logger.warning("No browser channel passed to install_prefs, taking prefs from main branch")
    223            channel = "nightly"
    224 
    225        if dest is None:
    226            dest = os.curdir
    227 
    228        dest = os.path.join(dest, "profiles", rev if rev is not None else channel)
    229        if version:
    230            dest = os.path.join(dest, version)
    231        have_cache = False
    232        if os.path.exists(dest) and os.path.exists(os.path.join(dest, "profiles.json")):
    233            if channel != "nightly":
    234                have_cache = True
    235            else:
    236                now = datetime.now()
    237                have_cache = (datetime.fromtimestamp(os.stat(dest).st_mtime) >
    238                              now - timedelta(days=1))
    239 
    240        # If we don't have a recent download, grab and extract the latest one
    241        if not have_cache:
    242            if os.path.exists(dest):
    243                rmtree(dest)
    244            os.makedirs(dest)
    245 
    246            self.get_profile_github(version, channel, dest, rev)
    247            self.logger.info(f"Test prefs downloaded to {dest}")
    248        else:
    249            self.logger.info(f"Using cached test prefs from {dest}")
    250 
    251        return dest
    252 
    253    def get_profile_github(self, version: Optional[str], channel: str, dest: str, rev: Optional[str]) -> None:
    254        """Read the testing/profiles data from firefox source on GitHub"""
    255 
    256        # There are several possible approaches here, none of which are great:
    257        # 1. Shallow, sparse, clone of the repo with no history and just the testing/profiles
    258        #    directory. This is too slow to be usable.
    259        # 2. Use the Github repository contents API to read all the files under that directory.
    260        #    This requires auth to not run into rate limits.
    261        # 3. Gitub tree API has basically the same problems
    262        # 4. Download a full archive of the relevant commit from Github and extract only the
    263        #    required directory. This is also too slow to be useful.
    264        #
    265        # In the end we use githubusercontent.com, which has the problem that it doesn't allow
    266        # directory listings. So we have to hardcode in all the files we need. In particular
    267        # for each profile we are currently just downloading the user.js file and ignoring the
    268        # extensions/ directory, which is currently unused.
    269        ref = self.get_git_ref(version, channel, rev)
    270        self.logger.info(f"Getting profile data from git ref {ref}")
    271        file_data = {}
    272        profiles_bytes = get_file_github("mozilla-firefox/firefox", ref, "testing/profiles/profiles.json")
    273        profiles = json.loads(profiles_bytes)
    274        file_data["profiles.json"] = profiles_bytes
    275        for subdir in profiles["web-platform-tests"]:
    276            rel_path = os.path.join(subdir, "user.js")
    277            file_data[rel_path] = get_file_github("mozilla-firefox/firefox",
    278                                                  ref,
    279                                                  f"testing/profiles/{subdir}/user.js")
    280 
    281        for path, data in file_data.items():
    282            dest_path = os.path.join(dest, path)
    283            os.makedirs(os.path.dirname(dest_path), exist_ok=True)
    284            with open(dest_path, "wb") as f:
    285                f.write(data)
    286 
    287    def get_version_and_channel(self, binary: str) -> Tuple[Optional[str], str, Optional[str]]:
    288        application_ini_path = os.path.join(os.path.dirname(binary), "application.ini")
    289        if os.path.exists(application_ini_path):
    290            try:
    291                return self.get_version_and_channel_application_ini(application_ini_path)
    292            except ValueError as e:
    293                self.logger.info(f"Reading application ini file failed: {e}")
    294                # Fall back to calling the binary
    295        version_string: str = call(binary, "--version").strip()  # type: ignore
    296        version_re = re.compile(r"Mozilla Firefox (.*)")
    297        m = version_re.match(version_string)
    298        if not m:
    299            return None, "nightly", None
    300        version, channel = self.extract_version_number(m.group(1))
    301        return version, channel, None
    302 
    303    def get_version_and_channel_application_ini(self, path: str) -> Tuple[Optional[str], str, Optional[str]]:
    304        """Try to read application version from an ini file
    305 
    306        This doesn't work in all cases e.g. local builds don't have
    307        all the information, or builds where the binary path is a shell
    308        script or similar."""
    309        config = configparser.ConfigParser()
    310        paths = config.read(path)
    311        if path not in paths:
    312            raise ValueError("Failed to read config file")
    313 
    314        version = config.get("App", "Version", fallback=None)
    315        if version is None:
    316            raise ValueError("Failed to find Version key")
    317        version, channel = self.extract_version_number(version)
    318 
    319        rev = None
    320        if channel == "nightly":
    321            source_repo = config.get("App", "SourceRepository", fallback=None)
    322            commit = config.get("App", "SourceStamp", fallback=None)
    323            if source_repo is not None and commit is not None:
    324                if source_repo.startswith("https://hg.mozilla.org"):
    325                    try:
    326                        commit_data: Dict[str, Any] = get(
    327                            f"https://hg-edge.mozilla.org/integration/autoland/json-rev/{commit}"
    328                        ).json()  # type: ignore
    329                        rev = commit_data.get("git_commit")
    330                    except Exception:
    331                        pass
    332                else:
    333                    rev = commit
    334 
    335        return version, channel, rev
    336 
    337    def extract_version_number(self, version_string: str) -> Tuple[Optional[str], str]:
    338        version_parser = re.compile(r"^(\d+)\.(\d+)(?:\.(\d+))?((a|b)\d+)?")
    339        m = version_parser.match(version_string)
    340        if not m:
    341            return None, "nightly"
    342        major, minor, patch, pre, channel_id = m.groups()
    343        version = f"{major}.{minor}"
    344        if patch is not None:
    345            version += f".{patch}"
    346        if pre is not None:
    347            version += pre
    348        channel = {"a": "nightly", "b": "beta"}.get(channel_id, "stable")
    349        return version, channel
    350 
    351    def get_git_tags(self, ref_prefix: str) -> List[str]:
    352        tags = []
    353        for tag_data in get(
    354                f"https://api.github.com/repos/mozilla-firefox/firefox/git/matching-refs/tags/{ref_prefix}"
    355        ).json():  # type: ignore
    356            tag = tag_data["ref"].rsplit("/", 1)[1]
    357            tags.append(tag)
    358        return tags
    359 
    360    def get_git_ref(self, version: Optional[str], channel: str, rev: Optional[str]) -> str:
    361        if rev is not None:
    362            return rev
    363 
    364        ref_prefix = "FIREFOX_"
    365        ref_re = None
    366        tags = []
    367 
    368        if channel == "stable":
    369            if version:
    370                return "FIREFOX_%s_RELEASE" % version.replace(".", "_")
    371            ref_re = re.compile(r"FIREFOX_(\d+)_(\d+)(?:_(\d+))?_RELEASE")
    372        elif channel == "beta":
    373            if version:
    374                ref_prefix = "FIREFOX_%s" % version.replace(".", "_")
    375                if "b" not in version:
    376                    ref_re = re.compile(fr"{ref_prefix}b(\d+)_(?:BUILD(\d+)|RELEASE)")
    377                else:
    378                    ref_re = re.compile(fr"{ref_prefix}_(?:BUILD(\d+)|RELEASE)")
    379            else:
    380                ref_re = re.compile(r"FIREFOX_(\d+)_(\d+)b(\d+)_(?:BUILD(\d+)|RELEASE)")
    381        else:
    382            return "main"
    383 
    384        assert ref_re is not None
    385 
    386        for tag in self.get_git_tags(ref_prefix):
    387            m = ref_re.match(tag)
    388            if not m:
    389                continue
    390            order = [int(item) for item in m.groups() if item is not None]
    391            if channel == "beta" and tag.endswith("_RELEASE"):
    392                order[-1] = sys.maxsize
    393            tags.append((tuple(order), tag))
    394        if not tags:
    395            raise ValueError(f"No tag found for version {version} channel {channel}")
    396        return max(tags)[1]
    397 
    398 
    399 class FirefoxAndroidPrefs(FirefoxPrefs):
    400    def get_git_ref(self, version: Optional[str], channel: str, rev: Optional[str]) -> str:
    401        if rev is not None:
    402            return rev
    403 
    404        tags = []
    405        ref_prefix = "FIREFOX-ANDROID_"
    406        ref_re = None
    407        if channel == "stable":
    408            if version is not None:
    409                return "FIREFOX-ANDROID_%s_RELEASE" % version.replace(".", "_")
    410 
    411            ref_re = re.compile(r"FIREFOX-ANDROID_(\d+)_(\d+)(?:_(\d+))?_RELEASE")
    412 
    413        elif channel == "beta":
    414            if version:
    415                ref_prefix = "FIREFOX-ANDROID_%s" % version.replace(".", "_")
    416            ref_re = re.compile(r"FIREFOX-ANDROID_(\d+)_(\d+)b(\d+)_RELEASE")
    417        else:
    418            return "main"
    419 
    420        assert ref_re is not None
    421        for tag in self.get_git_tags(ref_prefix):
    422            m = ref_re.match(tag)
    423            if m is None:
    424                continue
    425            order = tuple(int(item) for item in m.groups() if item is not None)
    426            tags.append((order, tag))
    427 
    428        if not tags:
    429            raise ValueError(f"No tag found for {version} beta")
    430        return max(tags)[1]
    431 
    432 
    433 class Firefox(Browser):
    434    """Firefox-specific interface.
    435 
    436    Includes installation, webdriver installation, and wptrunner setup methods.
    437    """
    438 
    439    product = "firefox"
    440    binary = "browsers/firefox/firefox"
    441    requirements = "requirements_firefox.txt"
    442 
    443    platform = {
    444        "Linux": "linux",
    445        "Windows": "win",
    446        "Darwin": "macos"
    447    }.get(uname[0])
    448 
    449    application_name = {
    450        "stable": "Firefox.app",
    451        "beta": "Firefox.app",
    452        "nightly": "Firefox Nightly.app"
    453    }
    454 
    455    def platform_string_geckodriver(self):
    456        if self.platform is None:
    457            raise ValueError("Unable to construct a valid Geckodriver package name for current platform")
    458 
    459        if self.platform in ("linux", "win"):
    460            bits = "64" if uname[4] == "x86_64" else "32"
    461        elif self.platform == "macos" and uname.machine == "arm64":
    462            bits = "-aarch64"
    463        else:
    464            bits = ""
    465 
    466        return "%s%s" % (self.platform, bits)
    467 
    468    def download(self, dest=None, channel="nightly", rename=None):
    469        product = {
    470            "nightly": "firefox-nightly-latest-ssl",
    471            "beta": "firefox-beta-latest-ssl",
    472            "stable": "firefox-latest-ssl"
    473        }
    474 
    475        os_builds = {
    476            ("linux", "x86"): "linux",
    477            ("linux", "x86_64"): "linux64",
    478            ("win", "x86"): "win",
    479            ("win", "AMD64"): "win64",
    480            ("macos", "x86_64"): "osx",
    481            ("macos", "arm64"): "osx",
    482        }
    483        os_key = (self.platform, uname[4])
    484 
    485        dest = self._get_browser_download_dir(dest, channel)
    486 
    487        if channel not in product:
    488            raise ValueError("Unrecognised release channel: %s" % channel)
    489 
    490        if os_key not in os_builds:
    491            raise ValueError("Unsupported platform: %s %s" % os_key)
    492 
    493        url = "https://download.mozilla.org/?product=%s&os=%s&lang=en-US" % (product[channel],
    494                                                                             os_builds[os_key])
    495        self.logger.info("Downloading Firefox from %s" % url)
    496        resp = get(url)
    497 
    498        filename = get_download_filename(resp, "firefox.tar.bz2")
    499 
    500        if rename:
    501            filename = "%s%s" % (rename, get_ext(filename))
    502 
    503        installer_path = os.path.join(dest, filename)
    504 
    505        with open(installer_path, "wb") as f:
    506            f.write(resp.content)
    507 
    508        return installer_path
    509 
    510    def install(self, dest=None, channel="nightly"):
    511        """Install Firefox."""
    512        import mozinstall
    513 
    514        dest = self._get_browser_binary_dir(dest, channel)
    515 
    516        filename = os.path.basename(dest)
    517 
    518        installer_path = self.download(dest, channel)
    519 
    520        try:
    521            mozinstall.install(installer_path, dest)
    522        except mozinstall.mozinstall.InstallError:
    523            if self.platform == "macos" and os.path.exists(os.path.join(dest, self.application_name.get(channel, "Firefox Nightly.app"))):
    524                # mozinstall will fail if nightly is already installed in the venv because
    525                # mac installation uses shutil.copy_tree
    526                mozinstall.uninstall(os.path.join(dest, self.application_name.get(channel, "Firefox Nightly.app")))
    527                mozinstall.install(filename, dest)
    528            else:
    529                raise
    530 
    531        os.remove(installer_path)
    532        return self.find_binary_path(dest)
    533 
    534    def install_prefs(self, binary, dest=None, channel=None):
    535        return FirefoxPrefs(self.logger).install_prefs(binary, dest, channel)
    536 
    537    def find_binary_path(self, path=None, channel="nightly"):
    538        """Looks for the firefox binary in the virtual environment"""
    539 
    540        if path is None:
    541            path = self._get_browser_binary_dir(None, channel)
    542 
    543        binary = None
    544 
    545        if self.platform == "linux":
    546            binary = which("firefox", path=os.path.join(path, "firefox"))
    547        elif self.platform == "win":
    548            import mozinstall
    549            try:
    550                binary = mozinstall.get_binary(path, "firefox")
    551            except mozinstall.InvalidBinary:
    552                # ignore the case where we fail to get a binary
    553                pass
    554        elif self.platform == "macos":
    555            binary = which("firefox",
    556                           path=os.path.join(path,
    557                                             self.application_name.get(channel, "Firefox Nightly.app"),
    558                                             "Contents", "MacOS"))
    559 
    560        return binary
    561 
    562    def find_binary(self, venv_path=None, channel="nightly"):
    563 
    564        path = self._get_browser_binary_dir(venv_path, channel)
    565        binary = self.find_binary_path(path, channel)
    566 
    567        if not binary and self.platform == "win":
    568            winpaths = [os.path.expandvars("$SYSTEMDRIVE\\Program Files\\Mozilla Firefox"),
    569                        os.path.expandvars("$SYSTEMDRIVE\\Program Files (x86)\\Mozilla Firefox")]
    570            for winpath in winpaths:
    571                binary = self.find_binary_path(winpath, channel)
    572                if binary is not None:
    573                    break
    574 
    575        if not binary and self.platform == "macos":
    576            macpaths = ["/Applications/Firefox Nightly.app/Contents/MacOS",
    577                        os.path.expanduser("~/Applications/Firefox Nightly.app/Contents/MacOS"),
    578                        "/Applications/Firefox Developer Edition.app/Contents/MacOS",
    579                        os.path.expanduser("~/Applications/Firefox Developer Edition.app/Contents/MacOS"),
    580                        "/Applications/Firefox.app/Contents/MacOS",
    581                        os.path.expanduser("~/Applications/Firefox.app/Contents/MacOS")]
    582            return which("firefox", path=os.pathsep.join(macpaths))
    583 
    584        if binary is None:
    585            return which("firefox")
    586 
    587        return binary
    588 
    589    def find_certutil(self):
    590        path = which("certutil")
    591        if path is None:
    592            return None
    593        if os.path.splitdrive(os.path.normcase(path))[1].split(os.path.sep) == ["", "windows", "system32", "certutil.exe"]:
    594            return None
    595        return path
    596 
    597    def find_webdriver(self, venv_path=None, channel=None):
    598        return which("geckodriver")
    599 
    600    def _latest_geckodriver_version(self):
    601        """Get and return latest version number for geckodriver."""
    602        # This is used rather than an API call to avoid rate limits
    603        tags = call("git", "ls-remote", "--tags", "--refs",
    604                    "https://github.com/mozilla/geckodriver.git")
    605        release_re = re.compile(r".*refs/tags/v(\d+)\.(\d+)\.(\d+)")
    606        latest_release = (0, 0, 0)
    607        for item in tags.split("\n"):
    608            m = release_re.match(item)
    609            if m:
    610                version = tuple(int(item) for item in m.groups())
    611                if version > latest_release:
    612                    latest_release = version
    613        assert latest_release != (0, 0, 0)
    614        return "v%s.%s.%s" % tuple(str(item) for item in latest_release)
    615 
    616    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
    617        """Install latest Geckodriver."""
    618        if dest is None:
    619            dest = os.getcwd()
    620 
    621        path = None
    622        if channel == "nightly":
    623            path = self.install_geckodriver_nightly(dest)
    624            if path is None:
    625                self.logger.warning("Nightly webdriver not found; falling back to release")
    626 
    627        if path is None:
    628            version = self._latest_geckodriver_version()
    629            format = "zip" if uname[0] == "Windows" else "tar.gz"
    630            self.logger.debug("Latest geckodriver release %s" % version)
    631            url = ("https://github.com/mozilla/geckodriver/releases/download/%s/geckodriver-%s-%s.%s" %
    632                   (version, version, self.platform_string_geckodriver(), format))
    633            if format == "zip":
    634                unzip(get(url).raw, dest=dest)
    635            else:
    636                untar(get(url).raw, dest=dest)
    637            path = which("geckodriver", path=dest)
    638 
    639        assert path is not None
    640        self.logger.info("Installed %s" %
    641                         subprocess.check_output([path, "--version"]).splitlines()[0])
    642        return path
    643 
    644    def install_geckodriver_nightly(self, dest):
    645        self.logger.info("Attempting to install webdriver from nightly")
    646 
    647        platform_bits = ("64" if uname[4] == "x86_64" else
    648                         ("32" if self.platform == "win" else ""))
    649        tc_platform = "%s%s" % (self.platform, platform_bits)
    650 
    651        archive_ext = ".zip" if uname[0] == "Windows" else ".tar.gz"
    652        archive_name = "public/build/geckodriver%s" % archive_ext
    653 
    654        try:
    655            resp = get_taskcluster_artifact(
    656                "gecko.v2.mozilla-central.latest.geckodriver.%s" % tc_platform,
    657                archive_name)
    658        except Exception:
    659            self.logger.info("Geckodriver download failed")
    660            return
    661 
    662        if archive_ext == ".zip":
    663            unzip(resp.raw, dest)
    664        else:
    665            untar(resp.raw, dest)
    666 
    667        exe_ext = ".exe" if uname[0] == "Windows" else ""
    668        path = os.path.join(dest, "geckodriver%s" % exe_ext)
    669 
    670        self.logger.info("Extracted geckodriver to %s" % path)
    671 
    672        return path
    673 
    674    def version(self, binary=None, webdriver_binary=None):
    675        """Retrieve the release version of the installed browser."""
    676        version_string = call(binary, "--version").strip()
    677        m = re.match(r"Mozilla Firefox (.*)", version_string)
    678        if not m:
    679            return None
    680        return m.group(1)
    681 
    682 
    683 class FirefoxAndroid(Browser):
    684    """Android-specific Firefox interface."""
    685 
    686    product = "firefox_android"
    687    requirements = "requirements_firefox.txt"
    688 
    689    def __init__(self, logger):
    690        super().__init__(logger)
    691        self.apk_path = None
    692        self._fx_browser = Firefox(self.logger)
    693 
    694    def download(self, dest=None, channel=None, rename=None):
    695        if dest is None:
    696            dest = os.pwd
    697 
    698        branches = {
    699            "stable": "mozilla-release",
    700            "beta": "mozilla-beta",
    701        }
    702        branch = branches.get(channel, "mozilla-central")
    703 
    704        resp = get_taskcluster_artifact(
    705            f"gecko.v2.{branch}.shippable.latest.mobile.android-x86_64-opt",
    706            "public/build/geckoview-test_runner.apk")
    707 
    708        filename = "geckoview-test_runner.apk"
    709        if rename:
    710            filename = "%s%s" % (rename, get_ext(filename)[1])
    711        self.apk_path = os.path.join(dest, filename)
    712 
    713        with open(self.apk_path, "wb") as f:
    714            f.write(resp.content)
    715 
    716        return self.apk_path
    717 
    718    def install(self, dest=None, channel=None):
    719        return self.download(dest, channel)
    720 
    721    def install_prefs(self, binary, dest=None, channel=None):
    722        return FirefoxAndroidPrefs(self.logger).install_prefs(binary, dest, channel)
    723 
    724    def find_binary(self, venv_path=None, channel=None):
    725        return self.apk_path
    726 
    727    def find_webdriver(self, venv_path=None, channel=None):
    728        return self._fx_browser.find_webdriver(venv_path, channel)
    729 
    730    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
    731        return self._fx_browser.install_webdriver(dest, channel, None)
    732 
    733    def version(self, binary=None, webdriver_binary=None):
    734        return None
    735 
    736 
    737 class ChromeChromiumBase(Browser):
    738    """
    739    Chrome/Chromium base Browser class for shared functionality between Chrome and Chromium
    740 
    741    For a detailed description on the installation and detection of these browser components,
    742    see https://web-platform-tests.org/running-tests/chrome-chromium-installation-detection.html
    743    """
    744 
    745    requirements: Optional[str] = "requirements_chromium.txt"
    746    platform = {
    747        "Linux": "Linux",
    748        "Windows": "Win",
    749        "Darwin": "Mac",
    750    }.get(uname[0])
    751 
    752    def _build_snapshots_url(self, revision, filename):
    753        return ("https://storage.googleapis.com/chromium-browser-snapshots/"
    754                f"{self._chromium_platform_string}/{revision}/{filename}")
    755 
    756    def _get_latest_chromium_revision(self):
    757        """Returns latest Chromium revision available for download."""
    758        # This is only used if the user explicitly passes "latest" for the revision flag.
    759        # The pinned revision is used by default to avoid unexpected failures as versions update.
    760        revision_url = ("https://storage.googleapis.com/chromium-browser-snapshots/"
    761                        f"{self._chromium_platform_string}/LAST_CHANGE")
    762        return get(revision_url).text.strip()
    763 
    764    def _get_pinned_chromium_revision(self):
    765        """Returns the pinned Chromium revision number."""
    766        return get("https://storage.googleapis.com/wpt-versions/pinned_chromium_revision").text.strip()
    767 
    768    def _get_chromium_revision(self, filename=None, version=None):
    769        """Retrieve a valid Chromium revision to download a browser component."""
    770 
    771        # If a specific version is passed as an argument, we will use it.
    772        if version is not None:
    773            # Detect a revision number based on the version passed.
    774            revision = self._get_base_revision_from_version(version)
    775            if revision is not None:
    776                # File name is needed to test if request is valid.
    777                url = self._build_snapshots_url(revision, filename)
    778                try:
    779                    # Check the status without downloading the content (this is a streaming request).
    780                    get(url)
    781                    return revision
    782                except requests.RequestException:
    783                    self.logger.warning("404: Unsuccessful attempt to download file "
    784                                        f"based on version. {url}")
    785        # If no URL was used in a previous install
    786        # and no version was passed, use the pinned Chromium revision.
    787        revision = self._get_pinned_chromium_revision()
    788 
    789        # If the url is successfully used to download/install, it will be used again
    790        # if another component is also installed during this run (browser/webdriver).
    791        return revision
    792 
    793    def _get_base_revision_from_version(self, version):
    794        """Get a Chromium revision number that is associated with a given version."""
    795        # This is not the single revision associated with the version,
    796        #     but instead is where it branched from. Chromium revisions are just counting
    797        #     commits on the master branch, there are no Chromium revisions for branches.
    798 
    799        version = self._remove_version_suffix(version)
    800 
    801        # Try to find the Chromium build with the same revision.
    802        try:
    803            omaha = get(f"https://omahaproxy.appspot.com/deps.json?version={version}").json()
    804            detected_revision = omaha['chromium_base_position']
    805            return detected_revision
    806        except requests.RequestException:
    807            self.logger.debug("Unsuccessful attempt to detect revision based on version")
    808        return None
    809 
    810    def _remove_existing_chromedriver_binary(self, path):
    811        """Remove an existing ChromeDriver for this product if it exists
    812        in the virtual environment.
    813        """
    814        # There may be an existing chromedriver binary from a previous install.
    815        # To provide a clean install experience, remove the old binary - this
    816        # avoids tricky issues like unzipping over a read-only file.
    817        existing_chromedriver_path = which("chromedriver", path=path)
    818        if existing_chromedriver_path:
    819            self.logger.info(f"Removing existing ChromeDriver binary: {existing_chromedriver_path}")
    820            os.chmod(existing_chromedriver_path, stat.S_IWUSR)
    821            os.remove(existing_chromedriver_path)
    822 
    823    def _remove_version_suffix(self, version):
    824        """Removes channel suffixes from Chrome/Chromium version string (e.g. " dev")."""
    825        return version.split(' ')[0]
    826 
    827    @property
    828    def _chromedriver_platform_string(self):
    829        """Returns a string that represents the suffix of the ChromeDriver
    830        file name when downloaded from Chromium Snapshots.
    831        """
    832        if self.platform == "Linux":
    833            bits = "64" if uname[4] == "x86_64" else "32"
    834        elif self.platform == "Mac":
    835            bits = "64"
    836        elif self.platform == "Win":
    837            bits = "32"
    838        return f"{self.platform.lower()}{bits}"
    839 
    840    @property
    841    def _chromium_platform_string(self):
    842        """Returns a string that is used for the platform directory in Chromium Snapshots"""
    843        if (self.platform == "Linux" or self.platform == "Win") and uname[4] == "x86_64":
    844            return f"{self.platform}_x64"
    845        if self.platform == "Mac" and uname.machine == "arm64":
    846            return "Mac_Arm"
    847        return self.platform
    848 
    849    def find_webdriver(self, venv_path=None, channel=None, browser_binary=None):
    850        if venv_path:
    851            venv_path = os.path.join(venv_path, self.product)
    852        return which("chromedriver", path=venv_path)
    853 
    854    def install_mojojs(self, dest, browser_binary):
    855        """Install MojoJS web framework."""
    856        # MojoJS is platform agnostic, but the version number must be an
    857        # exact match of the Chrome/Chromium version to be compatible.
    858        chrome_version = self.version(binary=browser_binary)
    859        if not chrome_version:
    860            return None
    861        chrome_version = self._remove_version_suffix(chrome_version)
    862 
    863        try:
    864            # MojoJS version url must match the browser binary version exactly.
    865            url = ("https://storage.googleapis.com/chrome-for-testing-public/"
    866                   f"{chrome_version}/mojojs.zip")
    867            # Check the status without downloading the content (this is a streaming request).
    868            get(url)
    869        except requests.RequestException:
    870            # If a valid matching version cannot be found in the CfT archive,
    871            # download from Chromium snapshots bucket. However,
    872            # MojoJS is only bundled with Linux from Chromium snapshots.
    873            if self.platform == "Linux":
    874                filename = "mojojs.zip"
    875                revision = self._get_chromium_revision(filename, chrome_version)
    876                url = self._build_snapshots_url(revision, filename)
    877            else:
    878                self.logger.error("A valid MojoJS version cannot be found "
    879                                  f"for browser binary version {chrome_version}.")
    880                return None
    881 
    882        extracted = os.path.join(dest, "mojojs", "gen")
    883        last_url_file = os.path.join(extracted, "DOWNLOADED_FROM")
    884        if os.path.exists(last_url_file):
    885            with open(last_url_file, "rt") as f:
    886                last_url = f.read().strip()
    887            if last_url == url:
    888                self.logger.info("Mojo bindings already up to date")
    889                return extracted
    890            rmtree(extracted)
    891 
    892        try:
    893            self.logger.info(f"Downloading Mojo bindings from {url}")
    894            unzip(get(url).raw, dest)
    895            with open(last_url_file, "wt") as f:
    896                f.write(url)
    897            return extracted
    898        except Exception as e:
    899            self.logger.error(f"Cannot enable MojoJS: {e}")
    900            return None
    901 
    902    def install_webdriver_by_version(self, version, dest, revision=None):
    903        dest = os.path.join(dest, self.product)
    904        self._remove_existing_chromedriver_binary(dest)
    905        # _get_webdriver_url is implemented differently for Chrome and Chromium because
    906        # they download their respective versions of ChromeDriver from different sources.
    907        url = self._get_webdriver_url(version, revision)
    908        self.logger.info(f"Downloading ChromeDriver from {url}")
    909        unzip(get(url).raw, dest)
    910 
    911        # The two sources of ChromeDriver have different zip structures:
    912        # * Chromium archives the binary inside a chromedriver_* directory;
    913        # * Chrome archives the binary directly.
    914        # We want to make sure the binary always ends up directly in bin/.
    915        chromedriver_dir = os.path.join(dest,
    916                                        f"chromedriver_{self._chromedriver_platform_string}")
    917        chromedriver_path = which("chromedriver", path=chromedriver_dir)
    918        if chromedriver_path is not None:
    919            shutil.move(chromedriver_path, dest)
    920            rmtree(chromedriver_dir)
    921 
    922        chromedriver_path = which("chromedriver", path=dest)
    923        assert chromedriver_path is not None
    924        return chromedriver_path
    925 
    926    def version(self, binary=None, webdriver_binary=None):
    927        if not binary:
    928            self.logger.warning("No browser binary provided.")
    929            return None
    930 
    931        if uname[0] == "Windows":
    932            return _get_fileversion(binary, self.logger)
    933 
    934        try:
    935            version_string = call(binary, "--version").strip()
    936        except (subprocess.CalledProcessError, OSError) as e:
    937            self.logger.warning(f"Failed to call {binary}: {e}")
    938            return None
    939        m = re.match(r"(?:Google Chrome|Chromium) (.*)", version_string)
    940        if not m:
    941            self.logger.warning(f"Failed to extract version from: {version_string}")
    942            return None
    943        return m.group(1)
    944 
    945    def webdriver_version(self, webdriver_binary):
    946        if webdriver_binary is None:
    947            self.logger.warning("No valid webdriver supplied to detect version.")
    948            return None
    949 
    950        try:
    951            version_string = call(webdriver_binary, "--version").strip()
    952        except (subprocess.CalledProcessError, OSError) as e:
    953            self.logger.warning(f"Failed to call {webdriver_binary}: {e}")
    954            return None
    955        m = re.match(r"ChromeDriver ([0-9][0-9.]*)", version_string)
    956        if not m:
    957            self.logger.warning(f"Failed to extract version from: {version_string}")
    958            return None
    959        return m.group(1)
    960 
    961 
    962 class Chromium(ChromeChromiumBase):
    963    """Chromium-specific interface.
    964 
    965    Includes browser binary installation and detection.
    966    Webdriver installation and wptrunner setup shared in base class with Chrome
    967 
    968    For a detailed description on the installation and detection of these browser components,
    969    see https://web-platform-tests.org/running-tests/chrome-chromium-installation-detection.html
    970    """
    971    product = "chromium"
    972 
    973    @property
    974    def _chromium_package_name(self):
    975        return f"chrome-{self.platform.lower()}"
    976 
    977    def _get_existing_browser_revision(self, venv_path, channel):
    978        revision = None
    979        try:
    980            # A file referencing the revision number is saved with the binary.
    981            # Check if this revision number exists and use it if it does.
    982            path = os.path.join(self._get_browser_binary_dir(None, channel), "revision")
    983            with open(path) as f:
    984                revision = f.read().strip()
    985        except FileNotFoundError:
    986            # If there is no information about the revision downloaded,
    987            # use the pinned revision.
    988            revision = self._get_pinned_chromium_revision()
    989        return revision
    990 
    991    def _find_binary_in_directory(self, directory):
    992        """Search for Chromium browser binary in a given directory."""
    993        if uname[0] == "Darwin":
    994            return which("Chromium", path=os.path.join(directory,
    995                                                       self._chromium_package_name,
    996                                                       "Chromium.app",
    997                                                       "Contents",
    998                                                       "MacOS"))
    999        # which will add .exe on Windows automatically.
   1000        return which("chrome", path=os.path.join(directory, self._chromium_package_name))
   1001 
   1002    def _get_webdriver_url(self, version, revision=None):
   1003        """Get Chromium Snapshots url to download Chromium ChromeDriver."""
   1004        filename = f"chromedriver_{self._chromedriver_platform_string}.zip"
   1005 
   1006        # Make sure we use the same revision in an invocation.
   1007        # If we have a url that was last used successfully during this run,
   1008        # that url takes priority over trying to form another.
   1009        if hasattr(self, "last_revision_used") and self.last_revision_used is not None:
   1010            return self._build_snapshots_url(self.last_revision_used, filename)
   1011        if revision is None:
   1012            revision = self._get_chromium_revision(filename, version)
   1013        elif revision == "latest":
   1014            revision = self._get_latest_chromium_revision()
   1015        elif revision == "pinned":
   1016            revision = self._get_pinned_chromium_revision()
   1017 
   1018        return self._build_snapshots_url(revision, filename)
   1019 
   1020    def download(self, dest=None, channel=None, rename=None, version=None, revision=None):
   1021        dest = self._get_browser_download_dir(dest, channel)
   1022 
   1023        filename = f"{self._chromium_package_name}.zip"
   1024 
   1025        if revision is None:
   1026            revision = self._get_chromium_revision(filename, version)
   1027        elif revision == "latest":
   1028            revision = self._get_latest_chromium_revision()
   1029        elif revision == "pinned":
   1030            revision = self._get_pinned_chromium_revision()
   1031 
   1032        url = self._build_snapshots_url(revision, filename)
   1033        self.logger.info(f"Downloading Chromium from {url}")
   1034        resp = get(url)
   1035        installer_path = os.path.join(dest, filename)
   1036        with open(installer_path, "wb") as f:
   1037            f.write(resp.content)
   1038 
   1039        # Revision successfully used. Keep this revision if another component install is needed.
   1040        self.last_revision_used = revision
   1041        with open(os.path.join(dest, "revision"), "w") as f:
   1042            f.write(revision)
   1043        return installer_path
   1044 
   1045    def find_binary(self, venv_path=None, channel=None):
   1046        return self._find_binary_in_directory(self._get_browser_binary_dir(venv_path, channel))
   1047 
   1048    def install(self, dest=None, channel=None, version=None, revision=None):
   1049        dest = self._get_browser_binary_dir(dest, channel)
   1050        installer_path = self.download(dest, channel, version=version, revision=revision)
   1051        with open(installer_path, "rb") as f:
   1052            unzip(f, dest)
   1053        os.remove(installer_path)
   1054        return self._find_binary_in_directory(dest)
   1055 
   1056    def install_webdriver(self, dest=None, channel=None, browser_binary=None, revision=None):
   1057        if dest is None:
   1058            dest = os.pwd
   1059 
   1060        if revision is None:
   1061            # If a revision was not given, we will need to detect the browser version.
   1062            # The ChromeDriver that is installed will match this version.
   1063            revision = self._get_existing_browser_revision(dest, channel)
   1064 
   1065        chromedriver_path = self.install_webdriver_by_version(None, dest, revision)
   1066 
   1067        return chromedriver_path
   1068 
   1069    def webdriver_supports_browser(self, webdriver_binary, browser_binary, browser_channel=None):
   1070        """Check that the browser binary and ChromeDriver versions are a valid match."""
   1071        browser_version = self.version(browser_binary)
   1072        chromedriver_version = self.webdriver_version(webdriver_binary)
   1073 
   1074        if not chromedriver_version:
   1075            self.logger.warning("Unable to get version for ChromeDriver "
   1076                                f"{webdriver_binary}, rejecting it")
   1077            return False
   1078 
   1079        if not browser_version:
   1080            # If we can't get the browser version,
   1081            # we just have to assume the ChromeDriver is good.
   1082            return True
   1083 
   1084        # Because Chromium and its ChromeDriver should be pulled from the
   1085        # same revision number, their version numbers should match exactly.
   1086        if browser_version == chromedriver_version:
   1087            self.logger.debug("Browser and ChromeDriver versions match.")
   1088            return True
   1089        self.logger.warning(f"ChromeDriver version {chromedriver_version} does not match "
   1090                            f"Chromium version {browser_version}.")
   1091        return False
   1092 
   1093 
   1094 class DownloadNotFoundError(Exception):
   1095    """Raised when a download is not found for browser/webdriver installation."""
   1096    pass
   1097 
   1098 
   1099 class Chrome(ChromeChromiumBase):
   1100    """Chrome-specific interface.
   1101 
   1102    Includes browser binary installation and detection.
   1103    Webdriver installation and wptrunner setup shared in base class with Chromium.
   1104 
   1105    For a detailed description on the installation and detection of these browser components,
   1106    see https://web-platform-tests.org/running-tests/chrome-chromium-installation-detection.html
   1107    """
   1108 
   1109    product = "chrome"
   1110 
   1111    @property
   1112    def _chrome_platform_string(self):
   1113        """A string that represents the platform-based suffix
   1114        of the Chrome for Testing downloads.
   1115        """
   1116        if self.platform in ("Linux", "Win"):
   1117            bits = "64" if uname.machine == "x86_64" else "32"
   1118        elif self.platform == "Mac":
   1119            bits = "-arm64" if uname.machine == "arm64" else "-x64"
   1120        else:
   1121            bits = ""
   1122        return f"{self.platform.lower()}{bits}"
   1123 
   1124    @property
   1125    def _chrome_package_name(self):
   1126        return f"chrome-{self._chrome_platform_string}"
   1127 
   1128    def _get_build_version(self, version):
   1129        """Convert a Chrome/ChromeDriver version into MAJOR.MINOR.BUILD format."""
   1130        version_parts = version.split(".")
   1131        if len(version_parts) < 3:
   1132            self.logger.info(f"Version {version} could not be formatted for build matching.")
   1133            return None
   1134        return ".".join(version_parts[0:3])
   1135 
   1136    def _get_webdriver_url(self, version, channel):
   1137        """Get a ChromeDriver URL to download a version of ChromeDriver that matches
   1138        the browser binary version.
   1139 
   1140        Raises: ValueError if the given version string could not be formatted.
   1141 
   1142        Returns: A ChromeDriver download URL that matches the given Chrome version.
   1143        """
   1144        # Remove version suffix if it exists.
   1145        if version:
   1146            version = self._remove_version_suffix(version)
   1147 
   1148        formatted_version = self._get_build_version(version)
   1149        if formatted_version is None:
   1150            raise ValueError(f"Unknown version format: {version}")
   1151        major_version = version.split(".")[0]
   1152 
   1153        # Chrome for Testing only has ChromeDriver downloads available for Chrome 115+.
   1154        # If we are matching an older version of Chrome, use the old ChromeDriver source.
   1155        if int(major_version) < 115:
   1156            return self._get_old_webdriver_url(formatted_version)
   1157 
   1158        # Check if a file exists containing the matching ChromeDriver version download URL.
   1159        # This is generated when installing Chrome for Testing using the install command.
   1160        download_url_reference_file = os.path.join(
   1161            self._get_browser_binary_dir(None, channel), CHROMEDRIVER_SAVED_DOWNLOAD_FILE)
   1162        if os.path.isfile(download_url_reference_file):
   1163            self.logger.info("Download info for matching ChromeDriver version found.")
   1164            with open(download_url_reference_file, "r") as f:
   1165                return f.read()
   1166 
   1167        # If no ChromeDriver download URL reference file exists,
   1168        # try to find a download URL based on the build version.
   1169        self.logger.info(f"Searching for ChromeDriver downloads for version {version}.")
   1170        download_url = self._get_webdriver_url_by_build(formatted_version)
   1171        if download_url is None:
   1172            milestone = version.split('.')[0]
   1173            self.logger.info(f'No ChromeDriver download found for build {formatted_version}. '
   1174                             f'Finding latest available download for milestone {milestone}')
   1175            download_url = self._get_webdriver_url_by_milestone(milestone)
   1176        return download_url
   1177 
   1178    def _get_old_webdriver_url(self, version):
   1179        """Find a ChromeDriver download URL for Chrome version <= 114
   1180 
   1181        Raises: DownloadNotFoundError if no ChromeDriver download URL is found
   1182                to match the given Chrome binary.
   1183 
   1184        Returns: A ChromeDriver download URL that matches the given Chrome version.
   1185        """
   1186        latest_url = ("https://chromedriver.storage.googleapis.com/LATEST_RELEASE_"
   1187                      f"{version}")
   1188        try:
   1189            latest = get(latest_url).text.strip()
   1190        except requests.RequestException as e:
   1191            raise DownloadNotFoundError("No matching ChromeDriver download"
   1192                                        f" found for version {version}.", e)
   1193 
   1194        filename = f"chromedriver_{self._chromedriver_platform_string}.zip"
   1195        return f"https://chromedriver.storage.googleapis.com/{latest}/{filename}"
   1196 
   1197    def _get_webdriver_url_by_build(self, version):
   1198        """Find a ChromeDriver download URL based on a MAJOR.MINOR.BUILD version.
   1199 
   1200        Raises: RequestException if a bad responses is received from
   1201                Chrome for Testing sources.
   1202 
   1203        Returns: Download URL string or None if no matching build is found.
   1204        """
   1205        try:
   1206            # Get a list of builds with download URLs from Chrome for Testing.
   1207            resp = get(f"{CHROME_FOR_TESTING_ROOT_URL}"
   1208                       "latest-patch-versions-per-build-with-downloads.json")
   1209        except requests.RequestException as e:
   1210            raise requests.RequestException(
   1211                "Chrome for Testing versions not found", e)
   1212        builds_json = resp.json()
   1213        builds_dict = builds_json["builds"]
   1214        if version not in builds_dict:
   1215            self.logger.info(f"No builds found for version {version}.")
   1216            return None
   1217        download_info = builds_dict[version]["downloads"]
   1218        if "chromedriver" not in download_info:
   1219            self.logger.info(f"No ChromeDriver download found for build {version}")
   1220            return None
   1221        downloads_for_platform = [d for d in download_info["chromedriver"]
   1222                                  if d["platform"] == self._chrome_platform_string]
   1223        if len(downloads_for_platform) == 0:
   1224            self.logger.info(f"No ChromeDriver download found for build {version}"
   1225                             f"of platform {self.platform}")
   1226            return None
   1227        return downloads_for_platform[0]["url"]
   1228 
   1229    def _get_webdriver_url_by_milestone(self, milestone):
   1230        """Find a ChromeDriver download URL that is the latest available
   1231        for a Chrome milestone.
   1232 
   1233        Raises: RequestException if a bad responses is received from
   1234                Chrome for Testing sources.
   1235 
   1236        Returns: Download URL string or None if no matching milestone is found.
   1237        """
   1238 
   1239        try:
   1240            # Get a list of builds with download URLs from Chrome for Testing.
   1241            resp = get(f"{CHROME_FOR_TESTING_ROOT_URL}"
   1242                       "latest-versions-per-milestone.json")
   1243        except requests.RequestException as e:
   1244            raise requests.RequestException(
   1245                "Chrome for Testing versions not found", e)
   1246        milestones_json = resp.json()
   1247        milestones_dict = milestones_json["milestones"]
   1248        if milestone not in milestones_dict:
   1249            self.logger.info(f"No latest version found for milestone {milestone}.")
   1250            return None
   1251        version_available = self._get_build_version(
   1252            milestones_dict[milestone]["version"])
   1253 
   1254        return self._get_webdriver_url_by_build(version_available)
   1255 
   1256    def _get_download_urls_by_version(self, version):
   1257        """Find Chrome for Testing and ChromeDriver download URLs matching a specific version.
   1258 
   1259        Raises: DownloadNotFoundError if no download is found for the given version or platform.
   1260                RequestException if a bad responses is received from
   1261                Chrome for Testing sources.
   1262 
   1263        Returns: Both binary downloads for Chrome and ChromeDriver.
   1264        """
   1265        try:
   1266            # Get a list of versions with download URLs from Chrome for Testing.
   1267            resp = get(f"{CHROME_FOR_TESTING_ROOT_URL}"
   1268                       "known-good-versions-with-downloads.json")
   1269        except requests.RequestException as e:
   1270            raise requests.RequestException(
   1271                "Chrome for Testing versions not found", e)
   1272        versions_json = resp.json()
   1273        versions_list = versions_json["versions"]
   1274        # Attempt to find a version match in the list of available downloads.
   1275        matching_versions = [v for v in versions_list if v["version"] == version]
   1276        if len(matching_versions) == 0:
   1277            raise DownloadNotFoundError(f"No Chrome for Testing download found for {version}")
   1278 
   1279        download_info = matching_versions[0]["downloads"]
   1280        # Find the download url that matches the current platform.
   1281        browser_download_urls = [d for d in download_info["chrome"]
   1282                                 if d["platform"] == self._chrome_platform_string]
   1283        if len(browser_download_urls) == 0:
   1284            raise DownloadNotFoundError(
   1285                f"No Chrome for Testing download found for {self.platform} of version {version}")
   1286        browser_download_url = browser_download_urls[0]["url"]
   1287 
   1288        # Get the corresponding ChromeDriver download URL for later use.
   1289        chromedriver_download_urls = [d for d in download_info["chromedriver"]
   1290                                      if d["platform"] == self._chrome_platform_string]
   1291        if len(chromedriver_download_urls) == 0:
   1292            # Some older versions of Chrome for Testing
   1293            # do not have a matching ChromeDriver download.
   1294            raise DownloadNotFoundError(
   1295                f"ChromeDriver download does not exist for version {version}")
   1296        chromedriver_url = chromedriver_download_urls[0]["url"]
   1297 
   1298        return browser_download_url, chromedriver_url
   1299 
   1300    def _get_download_urls_by_channel(self, channel):
   1301        """Find Chrome for Testing and ChromeDriver download URLs matching the given channel.
   1302 
   1303        Raises: DownloadNotFoundError if no download is found for the given channel or platform.
   1304                RequestException if a bad responses is received from Chrome for Testing sources.
   1305 
   1306        Returns: Both binary downloads for Chrome and ChromeDriver.
   1307        """
   1308        try:
   1309            resp = get(f"{CHROME_FOR_TESTING_ROOT_URL}"
   1310                       "last-known-good-versions-with-downloads.json")
   1311        except requests.RequestException as e:
   1312            raise requests.RequestException(
   1313                "Chrome for Testing versions not found", e)
   1314        channels_json = resp.json()
   1315        download_info = channels_json["channels"][channel.capitalize()]["downloads"]["chrome"]
   1316 
   1317        # Find the download URL that matches the current platform.
   1318        matching_download_urls = [d for d in download_info
   1319                                  if d["platform"] == self._chrome_platform_string]
   1320        if len(matching_download_urls) == 0:
   1321            raise DownloadNotFoundError("No matching download for platform "
   1322                                        f"{self.platform} of channel \"{channel}\".")
   1323 
   1324        browser_download_url = matching_download_urls[0]["url"]
   1325 
   1326        # Get the corresponding ChromeDriver download URL for later use.
   1327        chromedriver_download_info = (
   1328            channels_json["channels"][channel.capitalize()]["downloads"]["chromedriver"])
   1329 
   1330        matching_chromedriver_urls = [d for d in chromedriver_download_info
   1331                                      if d["platform"] == self._chrome_platform_string]
   1332        if len(matching_chromedriver_urls) == 0:
   1333            raise DownloadNotFoundError(
   1334                f"No ChromeDriver download found in Chrome for Testing {channel}.")
   1335        chromedriver_url = matching_chromedriver_urls[0]["url"]
   1336 
   1337        return browser_download_url, chromedriver_url
   1338 
   1339    def _save_chromedriver_download_info(self, dest, url):
   1340        """Save the download URL of a ChromeDriver binary that matches the browser.
   1341        This will allow for easy version matching, even in separate CLI invocations.
   1342        """
   1343        with open(os.path.join(dest, CHROMEDRIVER_SAVED_DOWNLOAD_FILE), "w") as f:
   1344            f.write(url)
   1345 
   1346    def download(self, dest=None, channel="canary", rename=None, version=None):
   1347        """Download Chrome for Testing. For more information,
   1348        see: https://github.com/GoogleChromeLabs/chrome-for-testing
   1349        """
   1350        dest = self._get_browser_binary_dir(None, channel)
   1351        filename = f"{self._chrome_package_name}.zip"
   1352 
   1353        # If a version has been supplied, try to find a download to match that version.
   1354        # Otherwise, find a download for the specified channel.
   1355        if version is not None:
   1356            download_url, chromedriver_url = self._get_download_urls_by_version(version)
   1357        else:
   1358            download_url, chromedriver_url = self._get_download_urls_by_channel(channel)
   1359 
   1360        self.logger.info(f"Downloading Chrome for Testing from {download_url}")
   1361        resp = get(download_url)
   1362        installer_path = os.path.join(dest, filename)
   1363        with open(installer_path, "wb") as f:
   1364            f.write(resp.content)
   1365 
   1366        # Save the ChromeDriver download URL for use if a matching ChromeDriver
   1367        # needs to be downloaded in a separate install invocation.
   1368        self._save_chromedriver_download_info(dest, chromedriver_url)
   1369 
   1370        return installer_path
   1371 
   1372    def _find_binary_in_directory(self, directory):
   1373        """Search for Chrome for Testing browser binary in a given directory."""
   1374        if uname[0] == "Darwin":
   1375            return which(
   1376                "Google Chrome for Testing",
   1377                path=os.path.join(directory,
   1378                                  self._chrome_package_name,
   1379                                  "Google Chrome for Testing.app",
   1380                                  "Contents",
   1381                                  "MacOS"))
   1382        # "which" will add .exe on Windows automatically.
   1383        return which("chrome", path=os.path.join(directory, self._chrome_package_name))
   1384 
   1385    def find_binary(self, venv_path=None, channel=None):
   1386        # Check for binary in venv first.
   1387        path = self._find_binary_in_directory(self._get_browser_binary_dir(venv_path, channel))
   1388        if path is not None:
   1389            return path
   1390 
   1391        if uname[0] == "Linux":
   1392            name = "google-chrome"
   1393            if channel == "stable":
   1394                name += "-stable"
   1395            elif channel == "beta":
   1396                name += "-beta"
   1397            elif channel == "dev":
   1398                name += "-unstable"
   1399            # No Canary on Linux.
   1400            return which(name)
   1401        if uname[0] == "Darwin":
   1402            suffix = ""
   1403            if channel in ("beta", "dev", "canary"):
   1404                suffix = " " + channel.capitalize()
   1405            path = f"/Applications/Google Chrome{suffix}.app/Contents/MacOS/Google Chrome{suffix}"
   1406            return path if os.path.isfile(path) else None
   1407        if uname[0] == "Windows":
   1408            name = "Chrome"
   1409            if channel == "beta":
   1410                name += " Beta"
   1411            elif channel == "dev":
   1412                name += " Dev"
   1413            path = os.path.expandvars(fr"$PROGRAMFILES\Google\{name}\Application\chrome.exe")
   1414            if channel == "canary":
   1415                path = os.path.expandvars(r"$LOCALAPPDATA\Google\Chrome SxS\Application\chrome.exe")
   1416            return path if os.path.isfile(path) else None
   1417        self.logger.warning("Unable to find the browser binary.")
   1418        return None
   1419 
   1420    def install(self, dest=None, channel=None, version=None):
   1421        dest = self._get_browser_binary_dir(dest, channel)
   1422        installer_path = self.download(dest=dest, channel=channel, version=version)
   1423        with open(installer_path, "rb") as f:
   1424            unzip(f, dest)
   1425        os.remove(installer_path)
   1426        return self._find_binary_in_directory(dest)
   1427 
   1428    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   1429        if dest is None:
   1430            dest = os.pwd
   1431 
   1432        # Detect the browser version.
   1433        # The ChromeDriver that is installed will match this version.
   1434        if browser_binary is None:
   1435            # If a browser binary path was not given, detect a valid path.
   1436            browser_binary = self.find_binary(channel=channel)
   1437            # We need a browser to version match, so if a browser binary path
   1438            # was not given and cannot be detected, raise an error.
   1439            if browser_binary is None:
   1440                raise FileNotFoundError("No browser binary detected. "
   1441                                        "Cannot install ChromeDriver without a browser version.")
   1442 
   1443        version = self.version(browser_binary)
   1444        if version is None:
   1445            # Check if the user has given a Chromium binary.
   1446            chromium = Chromium(self.logger)
   1447            if chromium.version(browser_binary):
   1448                raise ValueError("Provided binary is a Chromium binary and should be run using "
   1449                                 "\"./wpt run chromium\" or similar.")
   1450            raise ValueError(f"Unable to detect browser version from binary at {browser_binary}. "
   1451                             " Cannot install ChromeDriver without a valid version to match.")
   1452 
   1453        chromedriver_path = self.install_webdriver_by_version(version, dest, channel)
   1454 
   1455        return chromedriver_path
   1456 
   1457    def install_webdriver_by_version(self, version, dest, channel):
   1458        # Set the destination to a specific "chrome" folder to not overwrite or remove
   1459        # ChromeDriver versions used for Chromium.
   1460        dest = os.path.join(dest, self.product)
   1461        self._remove_existing_chromedriver_binary(dest)
   1462 
   1463        url = self._get_webdriver_url(version, channel)
   1464        if url is None:
   1465            raise DownloadNotFoundError(
   1466                f"No ChromeDriver download found to match browser version {version}")
   1467        self.logger.info(f"Downloading ChromeDriver from {url}")
   1468        unzip(get(url).raw, dest)
   1469 
   1470        chromedriver_dir = os.path.join(
   1471            dest, f"chromedriver-{self._chrome_platform_string}")
   1472        chromedriver_path = which("chromedriver", path=chromedriver_dir)
   1473 
   1474        if chromedriver_path is not None:
   1475            shutil.move(chromedriver_path, dest)
   1476            rmtree(chromedriver_dir)
   1477 
   1478        chromedriver_path = which("chromedriver", path=dest)
   1479        if chromedriver_path is None:
   1480            raise FileNotFoundError("ChromeDriver could not be detected after installation.")
   1481        return chromedriver_path
   1482 
   1483    def webdriver_supports_browser(self, webdriver_binary, browser_binary, browser_channel):
   1484        """Check that the browser binary and ChromeDriver versions are a valid match."""
   1485        browser_version = self.version(browser_binary)
   1486        chromedriver_version = self.webdriver_version(webdriver_binary)
   1487 
   1488        if not chromedriver_version:
   1489            self.logger.warning("Unable to get version for ChromeDriver "
   1490                                f"{webdriver_binary}, rejecting it")
   1491            return False
   1492 
   1493        # TODO(DanielRyanSmith): Determine if this version logic fail case is
   1494        # still necessary and remove it if it isn't.
   1495        if not browser_version:
   1496            # If we can't get the browser version,
   1497            # we just have to assume the ChromeDriver is good.
   1498            return True
   1499 
   1500        # Format versions for comparison.
   1501        browser_version = self._get_build_version(browser_version)
   1502        chromedriver_version = self._get_build_version(chromedriver_version)
   1503 
   1504        # Chrome and ChromeDriver versions should match on the same MAJOR.MINOR.BUILD version.
   1505        if browser_version is not None and browser_version != chromedriver_version:
   1506            # Consider the same milestone as matching.
   1507            # Workaround for https://github.com/web-platform-tests/wpt/issues/42545
   1508            # TODO(DanielRyanSmith): Remove this logic when browser binary is
   1509            # downloaded from Chrome for Testing in CI runs.
   1510            browser_milestone = browser_version.split('.')[0]
   1511            chromedriver_milestone = chromedriver_version.split('.')[0]
   1512            if browser_milestone != chromedriver_milestone:
   1513                self.logger.warning(
   1514                    f"ChromeDriver {chromedriver_version} does not match Chrome {browser_version}")
   1515                return False
   1516        return True
   1517 
   1518    def version(self, binary=None, webdriver_binary=None):
   1519        """Get version string from browser binary."""
   1520        if not binary:
   1521            self.logger.warning("No browser binary provided.")
   1522            return None
   1523        if uname[0] == "Windows":
   1524            return _get_fileversion(binary, self.logger)
   1525 
   1526        try:
   1527            version_string = call(binary, "--version").strip()
   1528        except (subprocess.CalledProcessError, OSError) as e:
   1529            self.logger.warning(f"Failed to call {binary}: {e}")
   1530            return None
   1531        m = re.match(r"(?:Google Chrome for Testing|Google Chrome) (.*)", version_string)
   1532        if not m:
   1533            self.logger.warning(f"Failed to extract version from: {version_string}")
   1534            return None
   1535        return m.group(1)
   1536 
   1537    def webdriver_version(self, webdriver_binary):
   1538        """Get version string from ChromeDriver binary."""
   1539        if webdriver_binary is None:
   1540            self.logger.warning("No valid webdriver supplied to detect version.")
   1541            return None
   1542 
   1543        try:
   1544            version_string = call(webdriver_binary, "--version").strip()
   1545        except (subprocess.CalledProcessError, OSError) as e:
   1546            self.logger.warning(f"Failed to call {webdriver_binary}: {e}")
   1547            return None
   1548        m = re.match(r"ChromeDriver ([0-9][0-9.]*)", version_string)
   1549        if not m:
   1550            self.logger.warning(f"Failed to extract version from: {version_string}")
   1551            return None
   1552        return m.group(1)
   1553 
   1554 
   1555 class HeadlessShell(ChromeChromiumBase):
   1556    """Interface for the Chromium headless shell [0].
   1557 
   1558    [0]: https://chromium.googlesource.com/chromium/src/+/HEAD/headless/README.md
   1559    """
   1560 
   1561    product = "headless_shell"
   1562    requirements = None
   1563 
   1564    def download(self, dest=None, channel=None, rename=None):
   1565        # TODO(crbug.com/344669542): Download binaries via CfT.
   1566        raise NotImplementedError
   1567 
   1568    def install(self, dest=None, channel=None):
   1569        raise NotImplementedError
   1570 
   1571    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   1572        raise NotImplementedError
   1573 
   1574    def find_binary(self, venv_path=None, channel=None):
   1575        # `which()` adds `.exe` extension automatically for Windows.
   1576        # Chromium builds an executable named `headless_shell`, whereas CfT
   1577        # ships under the name `chrome-headless-shell`.
   1578        return which("headless_shell") or which("chrome-headless-shell")
   1579 
   1580    def version(self, binary=None, webdriver_binary=None):
   1581        # TODO(crbug.com/327767951): Support `headless_shell --version`.
   1582        return "N/A"
   1583 
   1584 
   1585 class ChromeAndroidBase(Browser, metaclass=ABCMeta):
   1586    """A base class for ChromeAndroid and AndroidWebView.
   1587 
   1588    On Android, WebView is based on Chromium open source project, and on some
   1589    versions of Android we share the library with Chrome. Therefore, we have
   1590    a very similar WPT runner implementation.
   1591    Includes webdriver installation.
   1592    """
   1593 
   1594    def __init__(self, logger):
   1595        super().__init__(logger)
   1596        self.device_serial = None
   1597        self.adb_binary = "adb"
   1598 
   1599    def download(self, dest=None, channel=None, rename=None):
   1600        raise NotImplementedError
   1601 
   1602    def install(self, dest=None, channel=None):
   1603        raise NotImplementedError
   1604 
   1605    @abstractmethod
   1606    def find_binary(self, venv_path=None, channel=None):
   1607        raise NotImplementedError
   1608 
   1609    def find_webdriver(self, venv_path=None, channel=None):
   1610        return which("chromedriver")
   1611 
   1612    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   1613        if browser_binary is None:
   1614            browser_binary = self.find_binary(channel)
   1615        chrome = Chrome(self.logger)
   1616        return chrome.install_webdriver_by_version(self.version(browser_binary), dest, channel)
   1617 
   1618    def version(self, binary=None, webdriver_binary=None):
   1619        if not binary:
   1620            self.logger.warning("No package name provided.")
   1621            return None
   1622 
   1623        command = [self.adb_binary]
   1624        if self.device_serial:
   1625            # Assume we have same version of browser on all devices
   1626            command.extend(['-s', self.device_serial[0]])
   1627        command.extend(['shell', 'dumpsys', 'package', binary])
   1628        try:
   1629            output = call(*command)
   1630        except (subprocess.CalledProcessError, OSError):
   1631            self.logger.warning("Failed to call %s" % " ".join(command))
   1632            return None
   1633        match = re.search(r'versionName=(.*)', output)
   1634        if not match:
   1635            self.logger.warning("Failed to find versionName")
   1636            return None
   1637        return match.group(1)
   1638 
   1639 
   1640 class ChromeAndroid(ChromeAndroidBase):
   1641    """Chrome-specific interface for Android.
   1642    """
   1643 
   1644    product = "chrome_android"
   1645    requirements = "requirements_chromium.txt"
   1646 
   1647    def find_binary(self, venv_path=None, channel=None):
   1648        if channel in ("beta", "dev", "canary"):
   1649            return "com.chrome." + channel
   1650        return "com.android.chrome"
   1651 
   1652 
   1653 class AndroidWebview(ChromeAndroidBase):
   1654    """Webview-specific interface for Android.
   1655 
   1656    Design doc:
   1657    https://docs.google.com/document/d/19cGz31lzCBdpbtSC92svXlhlhn68hrsVwSB7cfZt54o/view
   1658    """
   1659 
   1660    product = "android_webview"
   1661    requirements = "requirements_chromium.txt"
   1662 
   1663    def find_binary(self, venv_path=None, channel=None):
   1664        # Just get the current package name of the WebView provider.
   1665        # For WebView, it is not trivial to change the WebView provider, so
   1666        # we will just grab whatever is available.
   1667        # https://chromium.googlesource.com/chromium/src/+/HEAD/android_webview/docs/channels.md
   1668        command = [self.adb_binary]
   1669        if self.device_serial:
   1670            command.extend(['-s', self.device_serial[0]])
   1671        command.extend(['shell', 'dumpsys', 'webviewupdate'])
   1672        try:
   1673            output = call(*command)
   1674        except (subprocess.CalledProcessError, OSError):
   1675            self.logger.warning("Failed to call %s" % " ".join(command))
   1676            return None
   1677        m = re.search(r'^\s*Current WebView package \(name, version\): \((.*), ([0-9.]*)\)$',
   1678                      output, re.M)
   1679        if m is None:
   1680            self.logger.warning("Unable to find current WebView package in dumpsys output")
   1681            return None
   1682        self.logger.warning("Final package name: " + m.group(1))
   1683        return m.group(1)
   1684 
   1685 
   1686 class ChromeiOS(Browser):
   1687    """Chrome-specific interface for iOS.
   1688    """
   1689 
   1690    product = "chrome_ios"
   1691    requirements = None
   1692 
   1693    def download(self, dest=None, channel=None, rename=None):
   1694        raise NotImplementedError
   1695 
   1696    def install(self, dest=None, channel=None):
   1697        raise NotImplementedError
   1698 
   1699    def find_binary(self, venv_path=None, channel=None):
   1700        raise NotImplementedError
   1701 
   1702    def find_webdriver(self, venv_path=None, channel=None):
   1703        raise NotImplementedError
   1704 
   1705    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   1706        raise NotImplementedError
   1707 
   1708    def version(self, binary=None, webdriver_binary=None):
   1709        if webdriver_binary is None:
   1710            self.logger.warning(
   1711                "Cannot find ChromeiOS version without CWTChromeDriver")
   1712            return None
   1713        # Use `chrome iOS driver --version` to get the version. Example output:
   1714        # "125.0.6378.0"
   1715        try:
   1716            version_string = call(webdriver_binary, "--version").strip()
   1717        except subprocess.CalledProcessError as e:
   1718            self.logger.warning(f"Failed to call {webdriver_binary}: {e}")
   1719            return None
   1720        m = re.match(r"[\d][\d\.]*", version_string)
   1721        if not m:
   1722            self.logger.warning(
   1723                f"Failed to extract version from: {version_string}")
   1724            return None
   1725        return m.group(0)
   1726 
   1727 
   1728 class Opera(Browser):
   1729    """Opera-specific interface.
   1730 
   1731    Includes webdriver installation, and wptrunner setup methods.
   1732    """
   1733 
   1734    product = "opera"
   1735    requirements = "requirements_opera.txt"
   1736 
   1737    @property
   1738    def binary(self):
   1739        if uname[0] == "Linux":
   1740            return "/usr/bin/opera"
   1741        # TODO Windows, Mac?
   1742        self.logger.warning("Unable to find the browser binary.")
   1743        return None
   1744 
   1745    def download(self, dest=None, channel=None, rename=None):
   1746        raise NotImplementedError
   1747 
   1748    def install(self, dest=None, channel=None):
   1749        raise NotImplementedError
   1750 
   1751    def platform_string(self):
   1752        platform = {
   1753            "Linux": "linux",
   1754            "Windows": "win",
   1755            "Darwin": "mac"
   1756        }.get(uname[0])
   1757 
   1758        if platform is None:
   1759            raise ValueError("Unable to construct a valid Opera package name for current platform")
   1760 
   1761        if platform == "linux":
   1762            bits = "64" if uname[4] == "x86_64" else "32"
   1763        elif platform == "mac":
   1764            bits = "64"
   1765        elif platform == "win":
   1766            bits = "32"
   1767 
   1768        return "%s%s" % (platform, bits)
   1769 
   1770    def find_binary(self, venv_path=None, channel=None):
   1771        raise NotImplementedError
   1772 
   1773    def find_webdriver(self, venv_path=None, channel=None):
   1774        return which("operadriver")
   1775 
   1776    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   1777        if dest is None:
   1778            dest = os.pwd
   1779        latest = get("https://api.github.com/repos/operasoftware/operachromiumdriver/releases/latest").json()["tag_name"]
   1780        url = "https://github.com/operasoftware/operachromiumdriver/releases/download/%s/operadriver_%s.zip" % (latest,
   1781                                                                                                                self.platform_string())
   1782        unzip(get(url).raw, dest)
   1783 
   1784        operadriver_dir = os.path.join(dest, "operadriver_%s" % self.platform_string())
   1785        shutil.move(os.path.join(operadriver_dir, "operadriver"), dest)
   1786        rmtree(operadriver_dir)
   1787 
   1788        path = which("operadriver")
   1789        st = os.stat(path)
   1790        os.chmod(path, st.st_mode | stat.S_IEXEC)
   1791        return path
   1792 
   1793    def version(self, binary=None, webdriver_binary=None):
   1794        """Retrieve the release version of the installed browser."""
   1795        binary = binary or self.binary
   1796        try:
   1797            output = call(binary, "--version")
   1798        except subprocess.CalledProcessError:
   1799            self.logger.warning("Failed to call %s" % binary)
   1800            return None
   1801        m = re.search(r"[0-9\.]+( [a-z]+)?$", output.strip())
   1802        if m:
   1803            return m.group(0)
   1804 
   1805 
   1806 class Edge(Browser):
   1807    """Microsoft Edge Chromium Browser class."""
   1808 
   1809    product = "edge"
   1810    requirements = "requirements_chromium.txt"
   1811    platform = {
   1812        "Linux": "linux",
   1813        "Windows": "win",
   1814        "Darwin": "macos"
   1815    }.get(uname[0])
   1816 
   1817    def _get_build_version(self, version):
   1818        """Convert a Edge/MSEdgeDriver version into MAJOR.MINOR.BUILD format."""
   1819        version_parts = version.split(".")
   1820        if len(version_parts) < 3:
   1821            self.logger.info(f"Version {version} could not be formatted for build matching.")
   1822            return None
   1823        return ".".join(version_parts[0:3])
   1824 
   1825    def _remove_existing_edgedriver_binary(self, path):
   1826        """Remove an existing MSEdgeDriver for this product if it exists
   1827        in the virtual environment.
   1828        """
   1829        # There may be an existing MSEdgeDriver binary from a previous install.
   1830        # To provide a clean install experience, remove the old binary - this
   1831        # avoids tricky issues like unzipping over a read-only file.
   1832        existing_edgedriver_path = which("MSEdgeDriver", path=path)
   1833        if existing_edgedriver_path:
   1834            self.logger.info(f"Removing existing MSEdgeDriver binary: {existing_edgedriver_path}")
   1835            os.chmod(existing_edgedriver_path, stat.S_IWUSR)
   1836            os.remove(existing_edgedriver_path)
   1837        existing_driver_notes_path = os.path.join(path, "Driver_notes")
   1838        if os.path.isdir(existing_driver_notes_path):
   1839            self.logger.info(f"Removing existing MSEdgeDriver binary: {existing_driver_notes_path}")
   1840            rmtree(existing_driver_notes_path)
   1841 
   1842    def download(self, dest=None, channel=None, rename=None):
   1843        raise NotImplementedError
   1844 
   1845    def install_mojojs(self, dest, browser_binary):
   1846        # MojoJS is platform agnostic, but the version number must be an
   1847        # exact match of the Edge version to be compatible.
   1848        edge_version = self.version(binary=browser_binary)
   1849        if not edge_version:
   1850            return None
   1851 
   1852        try:
   1853            # MojoJS version url must match the browser binary version exactly.
   1854            url = ("https://msedgedriver.microsoft.com/wpt-mojom/"
   1855                   f"{edge_version}/linux64/mojojs.zip")
   1856            # Check the status without downloading the content (this is a
   1857            # streaming request).
   1858            get(url)
   1859        except requests.RequestException:
   1860            self.logger.error("A valid MojoJS version cannot be found "
   1861                              f"for browser binary version {edge_version}.")
   1862            return None
   1863 
   1864        extracted = os.path.join(dest, "mojojs", "gen")
   1865        last_url_file = os.path.join(extracted, "DOWNLOADED_FROM")
   1866        if os.path.exists(last_url_file):
   1867            with open(last_url_file, "rt") as f:
   1868                last_url = f.read().strip()
   1869            if last_url == url:
   1870                self.logger.info("Mojo bindings already up to date")
   1871                return extracted
   1872            rmtree(extracted)
   1873 
   1874        try:
   1875            self.logger.info(f"Downloading Mojo bindings from {url}")
   1876            unzip(get(url).raw, os.path.join(dest, "mojojs"))
   1877            with open(last_url_file, "wt") as f:
   1878                f.write(url)
   1879            return extracted
   1880        except Exception as e:
   1881            self.logger.error(f"Cannot enable MojoJS: {e}")
   1882            return None
   1883 
   1884    def find_binary(self, venv_path=None, channel=None):
   1885        # TODO: Check for binary in virtual environment first
   1886        if self.platform == "linux":
   1887            name = "microsoft-edge"
   1888            if channel == "stable":
   1889                name += "-stable"
   1890            elif channel == "beta":
   1891                name += "-beta"
   1892            elif channel == "dev":
   1893                name += "-dev"
   1894            # No Canary on Linux.
   1895            return which(name)
   1896        if self.platform == "macos":
   1897            suffix = ""
   1898            if channel in ("beta", "dev", "canary"):
   1899                suffix = " " + channel.capitalize()
   1900            path = f"/Applications/Microsoft Edge{suffix}.app/Contents/MacOS/Microsoft Edge{suffix}"
   1901            return path if os.path.isfile(path) else None
   1902        if self.platform == "win":
   1903            suffix = ""
   1904            if channel in ("beta", "dev"):
   1905                suffix = " " + channel.capitalize()
   1906            winpaths = [os.path.expandvars(fr"%PROGRAMFILES%\Microsoft\Edge{suffix}\Application"),
   1907                        os.path.expandvars(fr"%programfiles(x86)%\Microsoft\Edge{suffix}\Application")]
   1908            path = which("msedge.exe", path=os.pathsep.join(winpaths))
   1909            if channel == "canary":
   1910                path = os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\Edge SxS\Application\msedge.exe")
   1911            return path if os.path.isfile(path) else None
   1912        self.logger.warning("Unable to find the browser binary.")
   1913        return None
   1914 
   1915    def find_webdriver(self, venv_path=None, channel=None):
   1916        return which("msedgedriver")
   1917 
   1918    def install(self, dest=None, channel=None):
   1919        raise NotImplementedError
   1920 
   1921    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   1922        if dest is None:
   1923            dest = os.pwd
   1924 
   1925        # Detect the browser version.
   1926        # The MSEdgeDriver that is installed will match this version.
   1927        if browser_binary is None:
   1928            # If a browser binary path was not given, detect a valid path.
   1929            browser_binary = self.find_binary(channel=channel)
   1930            # We need a browser to version match, so if a browser binary path
   1931            # was not given and cannot be detected, raise an error.
   1932            if browser_binary is None:
   1933                raise FileNotFoundError("No browser binary detected. "
   1934                                        "Cannot install MSEdgeDriver without a browser version.")
   1935 
   1936        version = self.version(browser_binary)
   1937        if version is None:
   1938            raise ValueError(f"Unable to detect browser version from binary at {browser_binary}. "
   1939                             "Cannot install MSEdgeDriver without a valid version to match.")
   1940 
   1941        edgedriver_path = self.install_webdriver_by_version(version, dest, channel)
   1942 
   1943        return edgedriver_path
   1944 
   1945    def install_webdriver_by_version(self, version, dest, channel):
   1946        self._remove_existing_edgedriver_binary(dest)
   1947 
   1948        if self.platform == "linux":
   1949            bits = "linux64"
   1950        elif self.platform == "macos":
   1951            bits = "mac64"
   1952        elif self.platform == "win":
   1953            bits = "win64" if uname[4] == "x86_64" else "win32"
   1954 
   1955        url = f"https://msedgedriver.microsoft.com/{version}/edgedriver_{bits}.zip"
   1956        self.logger.info(f"Downloading MSEdgeDriver from {url}")
   1957        unzip(get(url).raw, dest)
   1958        edgedriver_path = which("msedgedriver", path=dest)
   1959        assert edgedriver_path is not None
   1960        return edgedriver_path
   1961 
   1962    def webdriver_supports_browser(self, webdriver_binary, browser_binary, browser_channel):
   1963        """Check that the browser binary and MSEdgeDriver versions are a valid match."""
   1964        browser_version = self.version(browser_binary)
   1965        edgedriver_version = self.webdriver_version(webdriver_binary)
   1966 
   1967        if not edgedriver_version:
   1968            self.logger.warning("Unable to get version for MSEdgeDriver "
   1969                                f"{webdriver_binary}, rejecting it")
   1970            return False
   1971 
   1972        if not browser_version:
   1973            # If we can't get the browser version, we just have to assume the
   1974            # MSEdgeDriver is good.
   1975            self.logger.warning("Unable to get version for the browser "
   1976                                f"{browser_binary}, assuming MSEdgeDriver is good.")
   1977            return True
   1978 
   1979        # Check that the EdgeDriver version matches the Edge version.
   1980        browser_version = self._get_build_version(browser_version)
   1981        edgedriver_version = self._get_build_version(edgedriver_version)
   1982 
   1983        # Edge and MSEdgeDriver versions should match on the same MAJOR.MINOR.BUILD version.
   1984        if browser_version is not None and browser_version != edgedriver_version:
   1985            self.logger.warning(
   1986                f"MSEdgeDriver {edgedriver_version} does not match Edge {browser_version}")
   1987            return False
   1988        return True
   1989 
   1990    def version(self, binary=None, webdriver_binary=None):
   1991        """Get version string from browser binary."""
   1992        if not binary:
   1993            self.logger.warning("No browser binary provided.")
   1994            return None
   1995 
   1996        if uname[0] == "Windows":
   1997            return _get_fileversion(binary, self.logger)
   1998 
   1999        try:
   2000            version_string = call(binary, "--version").strip()
   2001        except (subprocess.CalledProcessError, OSError) as e:
   2002            self.logger.warning(f"Failed to call {binary}: {e}")
   2003            return None
   2004        m = re.match(r"Microsoft Edge ([0-9][0-9.]*)", version_string)
   2005        if not m:
   2006            self.logger.warning(f"Failed to extract version from: {version_string}")
   2007            return None
   2008        return m.group(1)
   2009 
   2010    def webdriver_version(self, webdriver_binary):
   2011        """Get version string from MSEdgeDriver binary."""
   2012        if webdriver_binary is None:
   2013            self.logger.warning("No valid webdriver supplied to detect version.")
   2014            return None
   2015 
   2016        try:
   2017            version_string = call(webdriver_binary, "--version").strip()
   2018        except (subprocess.CalledProcessError, OSError) as e:
   2019            self.logger.warning(f"Failed to call {webdriver_binary}: {e}")
   2020            return None
   2021        m = re.match(r"Microsoft Edge WebDriver ([0-9][0-9.]*)", version_string)
   2022        if not m:
   2023            self.logger.warning(f"Failed to extract version from: {version_string}")
   2024            return None
   2025        return m.group(1)
   2026 
   2027 
   2028 class Safari(Browser):
   2029    """Safari-specific interface.
   2030 
   2031    Includes installation, webdriver installation, and wptrunner setup methods.
   2032    """
   2033 
   2034    product = "safari"
   2035    requirements = "requirements_safari.txt"
   2036 
   2037    def _find_downloads(self):
   2038        def text_content(e, __output=None):
   2039            # this doesn't use etree.tostring so that we can add spaces for p and br
   2040            if __output is None:
   2041                __output = []
   2042 
   2043            if e.tag == "p":
   2044                __output.append("\n\n")
   2045 
   2046            if e.tag == "br":
   2047                __output.append("\n")
   2048 
   2049            if e.text is not None:
   2050                __output.append(e.text)
   2051 
   2052            for child in e:
   2053                text_content(child, __output)
   2054                if child.tail is not None:
   2055                    __output.append(child.tail)
   2056 
   2057            return "".join(__output)
   2058 
   2059        self.logger.info("Finding STP download URLs")
   2060        resp = get("https://developer.apple.com/safari/download/")
   2061 
   2062        doc = html5lib.parse(
   2063            resp.content,
   2064            "etree",
   2065            namespaceHTMLElements=False,
   2066            transport_encoding=resp.encoding,
   2067        )
   2068        ascii_ws = re.compile(r"[\x09\x0A\x0C\x0D\x20]+")
   2069 
   2070        downloads = []
   2071        for candidate in doc.iterfind(".//li[@class]"):
   2072            class_names = set(ascii_ws.split(candidate.attrib["class"]))
   2073            if {"download", "dmg", "zip"} & class_names:
   2074                downloads.append(candidate)
   2075 
   2076        # Note we use \s throughout for space as we don't care what form the whitespace takes
   2077        stp_link_text = re.compile(
   2078            r"^\s*Safari\s+Technology\s+Preview\s+(?:[0-9]+\s+)?for\s+macOS"
   2079        )
   2080        requirement = re.compile(
   2081            r"""(?x)  # (extended regexp syntax for comments)
   2082            ^\s*Requires\s+macOS\s+  # Starting with the magic string
   2083            ([0-9]+(?:\.[0-9]+)*)  # A macOS version number of numbers and dots
   2084            (?:\s+beta(?:\s+[0-9]+)?)?  # Optionally a beta, itself optionally with a number (no dots!)
   2085            (?:\s+or\s+later)?  # Optionally an 'or later'
   2086            \.?\s*$  # Optionally ending with a literal dot
   2087            """
   2088        )
   2089 
   2090        stp_downloads = []
   2091        for download in downloads:
   2092            for link in download.iterfind(".//a[@href]"):
   2093                if stp_link_text.search(text_content(link)):
   2094                    break
   2095                else:
   2096                    self.logger.debug("non-matching anchor: " + text_content(link))
   2097            else:
   2098                continue
   2099 
   2100            for el in download.iter():
   2101                # avoid assuming any given element here, just assume it is a single element
   2102                m = requirement.search(text_content(el))
   2103                if m:
   2104                    version = m.group(1)
   2105 
   2106                    # This assumes the current macOS numbering, whereby X.Y is compatible
   2107                    # with X.(Y+1), e.g. 12.4 is compatible with 12.3, but 13.0 isn't
   2108                    # compatible with 12.3.
   2109                    if version.count(".") >= (2 if version.startswith("10.") else 1):
   2110                        spec = SpecifierSet(f"~={version}")
   2111                    else:
   2112                        spec = SpecifierSet(f"=={version}.*")
   2113 
   2114                    stp_downloads.append((spec, link.attrib["href"].strip()))
   2115                    break
   2116            else:
   2117                self.logger.debug(
   2118                    "Found a link but no requirement: " + text_content(download)
   2119                )
   2120 
   2121        if stp_downloads:
   2122            self.logger.info(
   2123                "Found STP URLs for macOS " +
   2124                ", ".join(str(dl[0]) for dl in stp_downloads)
   2125            )
   2126        else:
   2127            self.logger.warning("Did not find any STP URLs")
   2128 
   2129        return stp_downloads
   2130 
   2131    def _download_image(self, downloads, dest, system_version=None):
   2132        if system_version is None:
   2133            system_version, _, _ = platform.mac_ver()
   2134 
   2135        chosen_url = None
   2136        for version_spec, url in downloads:
   2137            if system_version in version_spec:
   2138                self.logger.debug(f"Will download Safari for {version_spec}")
   2139                chosen_url = url
   2140                break
   2141 
   2142        if chosen_url is None:
   2143            raise ValueError(f"no download for {system_version}")
   2144 
   2145        self.logger.info(f"Downloading Safari from {chosen_url}")
   2146        resp = get(chosen_url)
   2147 
   2148        filename = get_download_filename(resp, "SafariTechnologyPreview.dmg")
   2149        installer_path = os.path.join(dest, filename)
   2150        with open(installer_path, "wb") as f:
   2151            f.write(resp.content)
   2152 
   2153        return installer_path
   2154 
   2155    def _download_extract(self, image_path, dest, rename=None):
   2156        with tempfile.TemporaryDirectory() as tmpdir:
   2157            self.logger.debug(f"Mounting {image_path}")
   2158            r = subprocess.run(
   2159                [
   2160                    "hdiutil",
   2161                    "attach",
   2162                    "-readonly",
   2163                    "-mountpoint",
   2164                    tmpdir,
   2165                    "-nobrowse",
   2166                    "-verify",
   2167                    "-noignorebadchecksums",
   2168                    "-autofsck",
   2169                    image_path,
   2170                ],
   2171                encoding="utf-8",
   2172                capture_output=True,
   2173                check=True,
   2174            )
   2175 
   2176            mountpoint = None
   2177            for line in r.stdout.splitlines():
   2178                if not line.startswith("/dev/"):
   2179                    continue
   2180 
   2181                _, _, mountpoint = line.split("\t", 2)
   2182                if mountpoint:
   2183                    break
   2184 
   2185            if mountpoint is None:
   2186                raise ValueError("no volume mounted from image")
   2187 
   2188            pkgs = [p for p in os.listdir(mountpoint) if p.endswith((".pkg", ".mpkg"))]
   2189            if len(pkgs) != 1:
   2190                raise ValueError(
   2191                    f"Expected a single .pkg/.mpkg, found {len(pkgs)}: {', '.join(pkgs)}"
   2192                )
   2193 
   2194            source_path = os.path.join(mountpoint, pkgs[0])
   2195            dest_path = os.path.join(
   2196                dest, (rename + get_ext(pkgs[0])) if rename is not None else pkgs[0]
   2197            )
   2198 
   2199            self.logger.debug(f"Copying {source_path} to {dest_path}")
   2200            shutil.copy2(
   2201                source_path,
   2202                dest_path,
   2203            )
   2204 
   2205            self.logger.debug(f"Unmounting {mountpoint}")
   2206            subprocess.run(
   2207                ["hdiutil", "detach", mountpoint],
   2208                encoding="utf-8",
   2209                capture_output=True,
   2210                check=True,
   2211            )
   2212 
   2213        return dest_path
   2214 
   2215    def download(self, dest=None, channel="preview", rename=None, system_version=None):
   2216        if channel != "preview":
   2217            raise ValueError(f"can only install 'preview', not '{channel}'")
   2218 
   2219        dest = self._get_browser_download_dir(dest, channel)
   2220 
   2221        stp_downloads = self._find_downloads()
   2222 
   2223        with tempfile.TemporaryDirectory() as tmpdir:
   2224            image_path = self._download_image(stp_downloads, tmpdir, system_version)
   2225            return self._download_extract(image_path, dest, rename)
   2226 
   2227    def install(self, dest=None, channel=None):
   2228        # We can't do this because stable/beta releases are system components and STP
   2229        # requires admin permissions to install.
   2230        raise NotImplementedError
   2231 
   2232    def find_binary(self, venv_path=None, channel=None):
   2233        raise NotImplementedError
   2234 
   2235    def find_webdriver(self, venv_path=None, channel=None):
   2236        path = None
   2237        if channel == "preview":
   2238            path = "/Applications/Safari Technology Preview.app/Contents/MacOS"
   2239        return which("safaridriver", path=path)
   2240 
   2241    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   2242        raise NotImplementedError
   2243 
   2244    def version(self, binary=None, webdriver_binary=None):
   2245        if webdriver_binary is None:
   2246            self.logger.warning("Cannot find Safari version without safaridriver")
   2247            return None
   2248        # Use `safaridriver --version` to get the version. Example output:
   2249        # "Included with Safari 12.1 (14607.1.11)"
   2250        # "Included with Safari Technology Preview (Release 67, 13607.1.9.0.1)"
   2251        # The `--version` flag was added in STP 67, so allow the call to fail.
   2252        try:
   2253            version_string = call(webdriver_binary, "--version").strip()
   2254        except subprocess.CalledProcessError:
   2255            self.logger.warning("Failed to call %s --version" % webdriver_binary)
   2256            return None
   2257        m = re.match(r"Included with Safari (.*)", version_string)
   2258        if not m:
   2259            self.logger.warning("Failed to extract version from: %s" % version_string)
   2260            return None
   2261        return m.group(1)
   2262 
   2263 
   2264 class Servo(Browser):
   2265    """Servo-specific interface."""
   2266 
   2267    product = "servo"
   2268    requirements = None
   2269 
   2270    def platform_components(self):
   2271        platform, triple = {
   2272            ("Darwin", "arm64"): ("mac-arm64", "aarch64-apple-darwin"),
   2273            ("Darwin", "x86_64"): ("mac", "x86_64-apple-darwin"),
   2274            ("Linux", "x86_64"): ("linux", "x86_64-linux-gnu"),
   2275            ("Windows", "AMD64"): ("win", "x86_64-windows-msvc"),
   2276        }.get((uname[0], uname[4]), (None, None))
   2277 
   2278        if platform is None:
   2279            raise ValueError("Unable to construct a valid Servo package for current platform")
   2280 
   2281        if platform == "linux":
   2282            extension = ".tar.gz"
   2283            decompress = untar
   2284        elif platform in ["win", "mac", "mac-arm64"]:
   2285            raise ValueError("Unable to construct a valid Servo package for current platform")
   2286 
   2287        default_filename = f"servo-{triple}"
   2288        return (platform, default_filename, extension, decompress)
   2289 
   2290    def _get(self, channel="nightly"):
   2291        if channel != "nightly":
   2292            raise ValueError("Only nightly versions of Servo are available")
   2293 
   2294        platform, filename, extension, _ = self.platform_components()
   2295        artifact = f"{filename}{extension}"
   2296        return get(f"https://download.servo.org/nightly/{platform}/{artifact}")
   2297 
   2298    def download(self, dest=None, channel="nightly", rename=None):
   2299        if dest is None:
   2300            dest = os.pwd
   2301 
   2302        resp = self._get(dest, channel)
   2303        _, default_filename, extension, _ = self.platform_components()
   2304 
   2305        filename = rename if rename is not None else default_filename
   2306        with open(os.path.join(dest, "%s%s" % (filename, extension,)), "w") as f:
   2307            f.write(resp.content)
   2308 
   2309    def install(self, dest=None, channel="nightly"):
   2310        """Install latest Browser Engine."""
   2311        if dest is None:
   2312            dest = os.pwd
   2313 
   2314        _, _, _, decompress = self.platform_components()
   2315 
   2316        resp = self._get(channel)
   2317        decompress(resp.raw, dest=dest)
   2318        path = which("servo", path=os.path.join(dest, "servo"))
   2319        st = os.stat(path)
   2320        os.chmod(path, st.st_mode | stat.S_IEXEC)
   2321        return path
   2322 
   2323    def find_binary(self, venv_path=None, channel=None):
   2324        path = which("servo", path=os.path.join(venv_path, "servo"))
   2325        if path is None:
   2326            path = which("servo")
   2327        return path
   2328 
   2329    def find_webdriver(self, venv_path=None, channel=None):
   2330        return None
   2331 
   2332    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   2333        raise NotImplementedError
   2334 
   2335    def version(self, binary=None, webdriver_binary=None):
   2336        """Retrieve the release version of the installed browser."""
   2337        output = call(binary, "--version")
   2338        m = re.search(r"Servo ([0-9\.]+-[a-f0-9]+)?(-dirty)?$", output.strip())
   2339        if m:
   2340            return m.group(0)
   2341 
   2342 
   2343 class ServoWebDriver(Servo):
   2344    product = "servodriver"
   2345 
   2346 
   2347 class Sauce(Browser):
   2348    """Sauce-specific interface."""
   2349 
   2350    product = "sauce"
   2351    requirements = "requirements_sauce.txt"
   2352 
   2353    def download(self, dest=None, channel=None, rename=None):
   2354        raise NotImplementedError
   2355 
   2356    def install(self, dest=None, channel=None):
   2357        raise NotImplementedError
   2358 
   2359    def find_binary(self, venev_path=None, channel=None):
   2360        raise NotImplementedError
   2361 
   2362    def find_webdriver(self, venv_path=None, channel=None):
   2363        raise NotImplementedError
   2364 
   2365    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   2366        raise NotImplementedError
   2367 
   2368    def version(self, binary=None, webdriver_binary=None):
   2369        return None
   2370 
   2371 
   2372 class WebKit(Browser):
   2373    """WebKit-specific interface."""
   2374 
   2375    product = "webkit"
   2376    requirements = None
   2377 
   2378    def download(self, dest=None, channel=None, rename=None):
   2379        raise NotImplementedError
   2380 
   2381    def install(self, dest=None, channel=None):
   2382        raise NotImplementedError
   2383 
   2384    def find_binary(self, venv_path=None, channel=None):
   2385        return None
   2386 
   2387    def find_webdriver(self, venv_path=None, channel=None):
   2388        return None
   2389 
   2390    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   2391        raise NotImplementedError
   2392 
   2393    def version(self, binary=None, webdriver_binary=None):
   2394        return None
   2395 
   2396 class Ladybird(Browser):
   2397    product = "ladybird"
   2398    requirements = None
   2399 
   2400    def download(self, dest=None, channel=None, rename=None):
   2401        raise NotImplementedError
   2402 
   2403    def install(self, dest=None, channel=None):
   2404        raise NotImplementedError
   2405 
   2406    def find_binary(self, venv_path=None, channel=None):
   2407        return which("ladybird")
   2408 
   2409    def find_webdriver(self, venv_path=None, channel=None):
   2410        return which("WebDriver")
   2411 
   2412    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   2413        raise NotImplementedError
   2414 
   2415    def version(self, binary=None, webdriver_binary=None):
   2416        if not binary:
   2417            self.logger.warning("No browser binary provided.")
   2418            return None
   2419        output = call(binary, "--version")
   2420        if output:
   2421            version_string = output.strip()
   2422            match = re.match(r"Version (.*)", version_string)
   2423            if match:
   2424                return match.group(1)
   2425        return None
   2426 
   2427 class WebKitTestRunner(Browser):
   2428    """Interface for WebKitTestRunner.
   2429    """
   2430 
   2431    product = "wktr"
   2432    requirements = None
   2433 
   2434    def _find_apple_port_builds(self, channel="main"):
   2435        if channel != "main":
   2436            raise ValueError(f"unable to get builds for branch {channel}")
   2437 
   2438        system_version, _, _ = platform.mac_ver()
   2439        if system_version in SpecifierSet("==13.*"):
   2440            platform_key = "mac-ventura-x86_64%20arm64"
   2441        elif system_version in SpecifierSet("==12.*"):
   2442            platform_key = "mac-monterey-x86_64%20arm64"
   2443        else:
   2444            raise ValueError(
   2445                f"don't know what platform to use for macOS {system_version}"
   2446            )
   2447 
   2448        # This should match http://github.com/WebKit/WebKit/blob/main/Websites/webkit.org/wp-content/themes/webkit/build-archives.php
   2449        build_index = get(
   2450            f"https://q1tzqfy48e.execute-api.us-west-2.amazonaws.com/v3/latest/{platform_key}-release"
   2451        ).json()
   2452 
   2453        builds = []
   2454 
   2455        for entry in build_index["Items"]:
   2456            creation_time = datetime.fromtimestamp(
   2457                int(entry["creationTime"]["N"]), timezone.utc
   2458            )
   2459            identifier = entry["identifier"]["S"]
   2460            s3_url = entry["s3_url"]["S"]
   2461 
   2462            builds.append((s3_url, identifier, creation_time))
   2463 
   2464        return builds
   2465 
   2466    def _download_metadata_apple_port(self, channel="main"):
   2467        digit_re = re.compile("([0-9]+)")
   2468 
   2469        def natsort(string_to_split):
   2470            split = digit_re.split(string_to_split)
   2471            # this converts the split numbers into tuples so that "01" < "1"
   2472            split[1::2] = [(int(i), i) for i in split[1::2]]
   2473            return split
   2474 
   2475        builds = sorted(
   2476            self._find_apple_port_builds(channel),
   2477            key=lambda x: natsort(x[1]),
   2478            reverse=True,
   2479        )
   2480        latest_build = builds[0]
   2481 
   2482        return {
   2483            "url": latest_build[0],
   2484            "identifier": latest_build[1],
   2485            "creation_time": latest_build[2],
   2486        }
   2487 
   2488    def download(
   2489        self, dest=None, channel="main", rename=None, version=None, revision=None
   2490    ):
   2491        if platform.system() == "Darwin":
   2492            meta = self._download_metadata_apple_port(channel)
   2493        else:
   2494            raise ValueError("Unsupported platform")
   2495 
   2496        output_path = self.download_from_url(
   2497            meta["url"],
   2498            dest=dest,
   2499            channel=channel,
   2500            rename=rename,
   2501        )
   2502 
   2503        dest = os.path.dirname(output_path)  # This is the actual, used dest.
   2504 
   2505        self.last_revision_used = meta["identifier"]
   2506        with open(os.path.join(dest, "identifier"), "w") as f:
   2507            f.write(self.last_revision_used)
   2508 
   2509        return output_path
   2510 
   2511    def install(self, dest=None, channel="main"):
   2512        dest = self._get_browser_binary_dir(dest, channel)
   2513        installer_path = self.download(dest=dest, channel=channel)
   2514        self.logger.info(f"Extracting to {dest}")
   2515        with open(installer_path, "rb") as f:
   2516            unzip(f, dest)
   2517 
   2518    def install_webdriver(self, dest=None, channel="main", browser_binary=None):
   2519        raise NotImplementedError
   2520 
   2521    def find_binary(self, venv_path=None, channel="main"):
   2522        path = self._get_browser_binary_dir(venv_path, channel)
   2523        return which("WebKitTestRunner", path=os.path.join(path, "Release"))
   2524 
   2525    def find_webdriver(self, venv_path=None, channel="main"):
   2526        return None
   2527 
   2528    def version(self, binary=None, webdriver_binary=None):
   2529        dirname = os.path.dirname(binary)
   2530        identifier = os.path.join(dirname, "..", "identifier")
   2531        if not os.path.exists(identifier):
   2532            return None
   2533 
   2534        with open(identifier, "r") as f:
   2535            return f.read().strip()
   2536 
   2537 
   2538 class WebKitGlibBaseMiniBrowser(WebKit):
   2539    """WebKitGTK and WPE MiniBrowser specific interface (base class)."""
   2540 
   2541    # This class is not meant to be used directly.
   2542    # And the class variables below should be defined on the subclasses.
   2543    BASE_DOWNLOAD_URI = ""
   2544    PORT_PRETTY_NAME = ""
   2545    WEBDRIVER_BINARY_NAME = ""
   2546    LIBEXEC_SUBDIR_PREFIXES = [""]
   2547    product = ""
   2548 
   2549    def __init__(self, *args, **kwargs):
   2550        if self.__class__.__name__ == "WebKitGlibBaseMiniBrowser":
   2551            raise RuntimeError("class WebKitGlibBaseMiniBrowser should not be used directly, but subclassed")
   2552        for required_class_var in ["BASE_DOWNLOAD_URI", "PORT_PRETTY_NAME", "WEBDRIVER_BINARY_NAME", "LIBEXEC_SUBDIR_PREFIXES", "product"]:
   2553            class_var_value = getattr(self, required_class_var, "")
   2554            if all(len(i) == 0 for i in class_var_value):
   2555                raise NotImplementedError('subclass "%s" should define class variable "%s"' % (self.__class__.__name__, required_class_var))
   2556        return super().__init__(*args, **kwargs)
   2557 
   2558    def download(self, dest=None, channel=None, rename=None):
   2559        base_download_dir = self.BASE_DOWNLOAD_URI + platform.machine() + "/release/" + channel + "/MiniBrowser/"
   2560        try:
   2561            response = get(base_download_dir + "LAST-IS")
   2562        except requests.exceptions.HTTPError as e:
   2563            if e.response.status_code == 404:
   2564                raise RuntimeError("Can't find a %s MiniBrowser %s bundle for %s at %s"
   2565                                   % (self.PORT_PRETTY_NAME, channel, platform.machine(), self.BASE_DOWNLOAD_URI))
   2566            raise
   2567 
   2568        bundle_filename = response.text.strip()
   2569        bundle_url = base_download_dir + quote(bundle_filename)
   2570 
   2571        dest = self._get_browser_download_dir(dest, channel)
   2572        bundle_file_path = os.path.join(dest, bundle_filename)
   2573 
   2574        self.logger.info("Downloading %s MiniBrowser bundle from %s" % (self.PORT_PRETTY_NAME, bundle_url))
   2575        with open(bundle_file_path, "w+b") as f:
   2576            get_download_to_descriptor(f, bundle_url)
   2577 
   2578        ext_ndots = 2 if '.tar.' in bundle_filename else 1
   2579        bundle_filename_no_ext = '.'.join(bundle_filename.split('.')[:-ext_ndots])
   2580        bundle_hash_url = base_download_dir + bundle_filename_no_ext + ".sha256sum"
   2581        bundle_expected_hash = get(bundle_hash_url).text.strip().split(" ")[0]
   2582        bundle_computed_hash = sha256sum(bundle_file_path)
   2583 
   2584        if bundle_expected_hash != bundle_computed_hash:
   2585            self.logger.error("Calculated SHA256 hash is %s but was expecting %s" % (bundle_computed_hash, bundle_expected_hash))
   2586            raise RuntimeError("The %s MiniBrowser bundle at %s has incorrect SHA256 hash." % (self.PORT_PRETTY_NAME, bundle_file_path))
   2587        return bundle_file_path
   2588 
   2589    def install(self, dest=None, channel=None, prompt=True):
   2590        dest = self._get_browser_binary_dir(dest, channel)
   2591        bundle_path = self.download(dest, channel)
   2592        bundle_uncompress_directory = os.path.join(dest, self.product)
   2593 
   2594        # Clean it from previous runs
   2595        if os.path.exists(bundle_uncompress_directory):
   2596            rmtree(bundle_uncompress_directory)
   2597        os.mkdir(bundle_uncompress_directory)
   2598 
   2599        bundle_file_name = os.path.basename(bundle_path)
   2600        with open(bundle_path, "rb") as f:
   2601            if bundle_file_name.endswith(".zip"):
   2602                unzip(f, bundle_uncompress_directory)
   2603            elif ".tar." in bundle_file_name:
   2604                untar(f, bundle_uncompress_directory)
   2605            else:
   2606                raise NotImplementedError("Don't know how to install the file: %s" % bundle_file_name)
   2607        os.remove(bundle_path)
   2608 
   2609        for expected_binary in ["MiniBrowser", self.WEBDRIVER_BINARY_NAME]:
   2610            binary_path = os.path.join(bundle_uncompress_directory, expected_binary)
   2611            if not (os.path.isfile(binary_path) and os.access(binary_path, os.X_OK)):
   2612                raise RuntimeError("Can't find a %s binary at %s" % (expected_binary, binary_path))
   2613 
   2614        minibrowser_path = os.path.join(bundle_uncompress_directory, "MiniBrowser")
   2615        version_str = subprocess.check_output([minibrowser_path, "--version"]).decode("utf-8").strip()
   2616        self.logger.info("%s MiniBrowser bundle for channel %s installed: %s" % (self.PORT_PRETTY_NAME, channel, version_str))
   2617        install_ok_file = os.path.join(bundle_uncompress_directory, ".installation-ok")
   2618        open(install_ok_file, "w").close()  # touch
   2619        return minibrowser_path
   2620 
   2621    def _find_executable_in_channel_bundle(self, binary, venv_path=None, channel=None):
   2622        if venv_path:
   2623            venv_base_path = self._get_browser_binary_dir(venv_path, channel)
   2624            bundle_dir = os.path.join(venv_base_path, self.product)
   2625            install_ok_file = os.path.join(bundle_dir, ".installation-ok")
   2626            if os.path.isfile(install_ok_file):
   2627                return shutil.which(binary, path=bundle_dir)
   2628        return None
   2629 
   2630    def find_binary(self, venv_path=None, channel=None):
   2631        minibrowser_path = self._find_executable_in_channel_bundle("MiniBrowser", venv_path, channel)
   2632        if minibrowser_path:
   2633            self.logger.info("Found %s MiniBrowser %s at path: %s" % (self.PORT_PRETTY_NAME, channel, minibrowser_path))
   2634            return minibrowser_path
   2635 
   2636        # Find MiniBrowser on the system which is usually installed on the libexec dir
   2637        triplet = "x86_64-linux-gnu"
   2638        # Try to use GCC to detect this machine triplet
   2639        gcc = shutil.which("gcc")
   2640        if gcc:
   2641            try:
   2642                triplet = call(gcc, "-dumpmachine").strip()
   2643            except subprocess.CalledProcessError:
   2644                pass
   2645        for libexec_dir in ["/usr/libexec", f"/usr/lib/{triplet}", "/usr/lib"]:
   2646            if os.path.isdir(libexec_dir):
   2647                for libexec_entry in sorted(os.listdir(libexec_dir), reverse=True):
   2648                    for libexec_subdir_prefix in self.LIBEXEC_SUBDIR_PREFIXES:
   2649                        if libexec_entry.startswith(libexec_subdir_prefix):
   2650                            minibrowser_candidate_path = os.path.join(libexec_dir, libexec_entry, 'MiniBrowser')
   2651                            if os.path.isfile(minibrowser_candidate_path) and os.access(minibrowser_candidate_path, os.X_OK):
   2652                                self.logger.info("Found %s MiniBrowser at path: %s" % (self.PORT_PRETTY_NAME, minibrowser_candidate_path))
   2653                                return minibrowser_candidate_path
   2654        return None
   2655 
   2656    def find_webdriver(self, venv_path=None, channel=None):
   2657        webdriver_path = self._find_executable_in_channel_bundle(self.WEBDRIVER_BINARY_NAME, venv_path, channel)
   2658        if not webdriver_path:
   2659            webdriver_path = shutil.which(self.WEBDRIVER_BINARY_NAME)
   2660        if webdriver_path:
   2661            self.logger.info("Found %s WebDriver at path: %s" % (self.PORT_PRETTY_NAME, webdriver_path))
   2662        return webdriver_path
   2663 
   2664    def version(self, binary=None, webdriver_binary=None):
   2665        if binary is None:
   2666            return None
   2667        try:  # WebKitGTK MiniBrowser before 2.26.0 doesn't support --version
   2668            output = call(binary, "--version").strip()
   2669        except subprocess.CalledProcessError:
   2670            return None
   2671        # Example output: "WebKitGTK 2.26.1"
   2672        if output:
   2673            m = re.match(r"%s (.+)" % self.PORT_PRETTY_NAME, output)
   2674            if not m:
   2675                self.logger.warning("Failed to extract version from: %s" % output)
   2676                return None
   2677            return m.group(1)
   2678        return None
   2679 
   2680 
   2681 class WebKitGTKMiniBrowser(WebKitGlibBaseMiniBrowser):
   2682    """WebKitGTK MiniBrowser specific interface."""
   2683 
   2684    BASE_DOWNLOAD_URI = "https://webkitgtk.org/built-products/"
   2685    PORT_PRETTY_NAME = "WebKitGTK"
   2686    WEBDRIVER_BINARY_NAME = "WebKitWebDriver"
   2687    LIBEXEC_SUBDIR_PREFIXES = ["webkitgtk", "webkit2gtk"]
   2688    product = "webkitgtk_minibrowser"
   2689 
   2690 
   2691 class WPEWebKitMiniBrowser(WebKitGlibBaseMiniBrowser):
   2692    """WPE WebKit MiniBrowser specific interface."""
   2693 
   2694    BASE_DOWNLOAD_URI = "https://wpewebkit.org/built-products/"
   2695    PORT_PRETTY_NAME = "WPE WebKit"
   2696    WEBDRIVER_BINARY_NAME = "WPEWebDriver"
   2697    LIBEXEC_SUBDIR_PREFIXES = ["wpe-webkit"]
   2698    product = "wpewebkit_minibrowser"
   2699 
   2700 
   2701 class Epiphany(Browser):
   2702    """Epiphany-specific interface."""
   2703 
   2704    product = "epiphany"
   2705    requirements = None
   2706 
   2707    def download(self, dest=None, channel=None, rename=None):
   2708        raise NotImplementedError
   2709 
   2710    def install(self, dest=None, channel=None):
   2711        raise NotImplementedError
   2712 
   2713    def find_binary(self, venv_path=None, channel=None):
   2714        return which("epiphany")
   2715 
   2716    def find_webdriver(self, venv_path=None, channel=None):
   2717        return which("WebKitWebDriver")
   2718 
   2719    def install_webdriver(self, dest=None, channel=None, browser_binary=None):
   2720        raise NotImplementedError
   2721 
   2722    def version(self, binary=None, webdriver_binary=None):
   2723        if binary is None:
   2724            return None
   2725        output = call(binary, "--version")
   2726        if output:
   2727            # Stable release output looks like: "Web 3.30.2"
   2728            # Tech Preview output looks like "Web 3.31.3-88-g97db4f40f"
   2729            return output.split()[1]
   2730        return None