tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

mercurial.py (15668B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this,
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 import errno
      6 import os
      7 import re
      8 import shutil
      9 import subprocess
     10 from contextlib import contextmanager
     11 from datetime import datetime
     12 from pathlib import Path
     13 from typing import Optional, Union
     14 
     15 from mozpack.files import FileListFinder
     16 
     17 from mozversioncontrol.errors import (
     18    CannotDeleteFromRootOfRepositoryException,
     19    MissingVCSExtension,
     20 )
     21 from mozversioncontrol.repo.base import Repository
     22 
     23 
     24 class HgRepository(Repository):
     25    """An implementation of `Repository` for Mercurial repositories."""
     26 
     27    def __init__(self, path: Path, hg="hg"):
     28        import hglib.client
     29 
     30        super().__init__(path, tool=hg)
     31        self._env["HGPLAIN"] = "1"
     32 
     33        # Setting this modifies a global variable and makes all future hglib
     34        # instances use this binary. Since the tool path was validated, this
     35        # should be OK. But ideally hglib would offer an API that defines
     36        # per-instance binaries.
     37        hglib.HGPATH = str(self._tool)
     38 
     39        # Without connect=False this spawns a persistent process. We want
     40        # the process lifetime tied to a context manager.
     41        self._client = hglib.client.hgclient(
     42            self.path, encoding="UTF-8", configs=None, connect=False
     43        )
     44 
     45    @property
     46    def name(self):
     47        return "hg"
     48 
     49    @property
     50    def head_ref(self):
     51        return self._run("log", "-r", ".", "-T", "{node}")
     52 
     53    def is_cinnabar_repo(self) -> bool:
     54        return False
     55 
     56    @property
     57    def base_ref(self):
     58        return self._run("log", "-r", "last(ancestors(.) and public())", "-T", "{node}")
     59 
     60    def base_ref_as_hg(self):
     61        return self.base_ref
     62 
     63    def base_ref_as_commit(self):
     64        raise Exception("unimplemented: convert hg rev to git rev")
     65 
     66    @property
     67    def branch(self):
     68        bookmarks_fn = Path(self.path) / ".hg" / "bookmarks.current"
     69        if bookmarks_fn.exists():
     70            with open(bookmarks_fn) as f:
     71                bookmark = f.read()
     72                return bookmark or None
     73 
     74        return None
     75 
     76    def __enter__(self):
     77        if self._client.server is None:
     78            # The cwd if the spawned process should be the repo root to ensure
     79            # relative paths are normalized to it.
     80            old_cwd = Path.cwd()
     81            try:
     82                os.chdir(self.path)
     83                self._client.open()
     84            finally:
     85                os.chdir(old_cwd)
     86 
     87        return self
     88 
     89    def __exit__(self, exc_type, exc_val, exc_tb):
     90        self._client.close()
     91 
     92    def _run(self, *args, **runargs):
     93        if not self._client.server:
     94            return super()._run(*args, **runargs)
     95 
     96        # hglib requires bytes on python 3
     97        args = [a.encode("utf-8") if not isinstance(a, bytes) else a for a in args]
     98        return self._client.rawcommand(args).decode("utf-8")
     99 
    100    def get_commit_time(self):
    101        newest_public_revision_time = self._run(
    102            "log",
    103            "--rev",
    104            "heads(ancestors(.) and not draft())",
    105            "--template",
    106            "{word(0, date|hgdate)}",
    107            "--limit",
    108            "1",
    109        ).strip()
    110 
    111        if not newest_public_revision_time:
    112            raise RuntimeError(
    113                "Unable to find a non-draft commit in this hg "
    114                "repository. If you created this repository from a "
    115                'bundle, have you done a "hg pull" from hg.mozilla.org '
    116                "since?"
    117            )
    118 
    119        return int(newest_public_revision_time)
    120 
    121    def sparse_checkout_present(self):
    122        # We assume a sparse checkout is enabled if the .hg/sparse file
    123        # has data. Strictly speaking, we should look for a requirement in
    124        # .hg/requires. But since the requirement is still experimental
    125        # as of Mercurial 4.3, it's probably more trouble than its worth
    126        # to verify it.
    127        sparse = Path(self.path) / ".hg" / "sparse"
    128 
    129        try:
    130            st = sparse.stat()
    131            return st.st_size > 0
    132        except OSError as e:
    133            if e.errno != errno.ENOENT:
    134                raise
    135 
    136            return False
    137 
    138    def get_user_email(self):
    139        # Output is in the form "First Last <flast@mozilla.com>"
    140        username = self._run("config", "ui.username", return_codes=[0, 1])
    141        if not username:
    142            # No username is set
    143            return None
    144        match = re.search(r"<(.*)>", username)
    145        if not match:
    146            # "ui.username" doesn't follow the "Full Name <email@domain>" convention
    147            return None
    148        return match.group(1)
    149 
    150    def _format_diff_filter(self, diff_filter, for_status=False):
    151        df = diff_filter.lower()
    152        assert all(f in self._valid_diff_filter for f in df)
    153 
    154        # When looking at the changes in the working directory, the hg status
    155        # command uses 'd' for files that have been deleted with a non-hg
    156        # command, and 'r' for files that have been `hg rm`ed. Use both.
    157        return df.replace("d", "dr") if for_status else df
    158 
    159    def _files_template(self, diff_filter):
    160        template = ""
    161        df = self._format_diff_filter(diff_filter)
    162        if "a" in df:
    163            template += "{file_adds % '{file}\\n'}"
    164        if "d" in df:
    165            template += "{file_dels % '{file}\\n'}"
    166        if "m" in df:
    167            template += "{file_mods % '{file}\\n'}"
    168        return template
    169 
    170    def get_changed_files(self, diff_filter="ADM", mode="unstaged", rev=None):
    171        if rev is None:
    172            # Use --no-status to print just the filename.
    173            df = self._format_diff_filter(diff_filter, for_status=True)
    174            return self._run("status", "--no-status", f"-{df}").splitlines()
    175        else:
    176            template = self._files_template(diff_filter)
    177            return self._run("log", "-r", rev, "-T", template).splitlines()
    178 
    179    def get_outgoing_files(self, diff_filter="ADM", upstream=None):
    180        template = self._files_template(diff_filter)
    181 
    182        if not upstream:
    183            return self._run(
    184                "log", "-r", "draft() and ancestors(.)", "--template", template
    185            ).split()
    186 
    187        return self._run(
    188            "outgoing",
    189            "-r",
    190            ".",
    191            "--quiet",
    192            "--template",
    193            template,
    194            upstream,
    195            return_codes=(1,),
    196        ).split()
    197 
    198    def add_remove_files(self, *paths: Union[str, Path], force: bool = False):
    199        if not paths:
    200            return
    201 
    202        paths = [str(path) for path in paths]
    203 
    204        args = ["addremove"] + paths
    205        m = re.search(r"\d+\.\d+", self.tool_version)
    206        simplified_version = float(m.group(0)) if m else 0
    207        if simplified_version >= 3.9:
    208            args = ["--config", "extensions.automv="] + args
    209        self._run(*args)
    210 
    211    def forget_add_remove_files(self, *paths: Union[str, Path]):
    212        if not paths:
    213            return
    214 
    215        paths = [str(path) for path in paths]
    216 
    217        self._run("forget", *paths)
    218 
    219    def get_tracked_files_finder(self, path=None):
    220        # Can return backslashes on Windows. Normalize to forward slashes.
    221        files = list(
    222            p.replace("\\", "/") for p in self._run("files", "-0").split("\0") if p
    223        )
    224        return FileListFinder(files)
    225 
    226    def get_ignored_files_finder(self):
    227        # Can return backslashes on Windows. Normalize to forward slashes.
    228        files = list(
    229            p.replace("\\", "/").split(" ")[-1]
    230            for p in self._run("status", "-i").split("\n")
    231            if p
    232        )
    233        return FileListFinder(files)
    234 
    235    def diff_stream(self, rev=None, extensions=(), exclude_file=None, context=8):
    236        args = ["diff", f"-U{context}"]
    237        if rev:
    238            args += ["-c", rev]
    239        else:
    240            args += ["-r", ".^"]
    241        for dot_extension in extensions:
    242            args += ["--include", f"glob:**{dot_extension}"]
    243        if exclude_file is not None:
    244            args += ["--exclude", f"listfile:{exclude_file}"]
    245        return self._pipefrom(*args)
    246 
    247    def working_directory_clean(self, untracked=False, ignored=False):
    248        args = ["status", "--modified", "--added", "--removed", "--deleted"]
    249        if untracked:
    250            args.append("--unknown")
    251        if ignored:
    252            args.append("--ignored")
    253 
    254        # If output is empty, there are no entries of requested status, which
    255        # means we are clean.
    256        return not len(self._run(*args).strip())
    257 
    258    def clean_directory(self, path: Union[str, Path]):
    259        if Path(self.path).samefile(path):
    260            raise CannotDeleteFromRootOfRepositoryException()
    261        self._run("revert", str(path))
    262        for single_path in self._run("st", "-un", str(path)).splitlines():
    263            single_path = Path(single_path)
    264            if single_path.is_file():
    265                single_path.unlink()
    266            else:
    267                shutil.rmtree(str(single_path))
    268 
    269    def update(self, ref):
    270        return self._run("update", "--check", ref)
    271 
    272    def raise_for_missing_extension(self, extension: str):
    273        """Raise `MissingVCSExtension` if `extension` is not installed and enabled."""
    274        try:
    275            self._run("showconfig", f"extensions.{extension}")
    276        except subprocess.CalledProcessError:
    277            raise MissingVCSExtension(extension)
    278 
    279    def push_to_try(
    280        self,
    281        message: str,
    282        changed_files: dict[str, str] = {},
    283        allow_log_capture: bool = False,
    284    ):
    285        if changed_files:
    286            self.stage_changes(changed_files)
    287 
    288        try:
    289            cmd = (str(self._tool), "push-to-try", "-m", message)
    290            if allow_log_capture:
    291                self._push_to_try_with_log_capture(
    292                    cmd,
    293                    {
    294                        "stdout": subprocess.PIPE,
    295                        "stderr": subprocess.PIPE,
    296                        "cwd": self.path,
    297                        "env": self._env,
    298                        "universal_newlines": True,
    299                        "bufsize": 1,
    300                    },
    301                )
    302            else:
    303                subprocess.check_call(
    304                    cmd,
    305                    cwd=self.path,
    306                    env=self._env,
    307                )
    308        except subprocess.CalledProcessError:
    309            self.raise_for_missing_extension("push-to-try")
    310            raise
    311        finally:
    312            self._run("revert", "-a")
    313 
    314    def get_commits(
    315        self,
    316        head: Optional[str] = None,
    317        base_ref: Optional[str] = None,
    318        limit: Optional[int] = None,
    319        follow: Optional[list[str]] = None,
    320    ) -> list[str]:
    321        """Return a list of commit SHAs for nodes on the current branch."""
    322        if not base_ref:
    323            base_ref = self.base_ref
    324 
    325        head_ref = head or self.head_ref
    326 
    327        cmd = [
    328            "log",
    329            "-r",
    330            f"{base_ref}::{head_ref} and not {base_ref}",
    331            "-T",
    332            "{node}\n",
    333        ]
    334        if limit is not None:
    335            cmd.append(f"-l{limit}")
    336        if follow is not None:
    337            cmd += ["-f", "--", *follow]
    338 
    339        return self._run(*cmd).splitlines()
    340 
    341    def get_commit_patches(self, nodes: list[str]) -> list[bytes]:
    342        """Return the contents of the patch `node` in the VCS' standard format."""
    343        # Running `hg export` once for each commit in a large stack is
    344        # slow, so instead we run it once and parse the output for each
    345        # individual patch.
    346        args = ["export"]
    347 
    348        for node in nodes:
    349            args.extend(("-r", node))
    350 
    351        output = self._run(*args, encoding=None)
    352 
    353        patches = []
    354 
    355        current_patch = []
    356        for i, line in enumerate(output.splitlines(keepends=True)):
    357            if i != 0 and line.rstrip() == b"# HG changeset patch":
    358                # When we see the first line of a new patch, add the patch we have been
    359                # building to the patches list and start building a new patch.
    360                patches.append(b"".join(current_patch))
    361                current_patch = [line]
    362            else:
    363                # Add a new line to the patch being built.
    364                current_patch.append(line)
    365 
    366        # Add the last patch to the stack.
    367        patches.append(b"".join(current_patch))
    368 
    369        return patches
    370 
    371    @contextmanager
    372    def try_commit(
    373        self, commit_message: str, changed_files: Optional[dict[str, str]] = None
    374    ):
    375        """Create a temporary try commit as a context manager.
    376 
    377        Create a new commit using `commit_message` as the commit message. The commit
    378        may be empty, for example when only including try syntax.
    379 
    380        `changed_files` may contain a dict of file paths and their contents,
    381        see `stage_changes`.
    382        """
    383        if changed_files:
    384            self.stage_changes(changed_files)
    385 
    386        # Allow empty commit messages in case we only use try-syntax.
    387        self._run("--config", "ui.allowemptycommit=1", "commit", "-m", commit_message)
    388 
    389        yield self.head_ref
    390 
    391        try:
    392            self._run("prune", ".")
    393        except subprocess.CalledProcessError:
    394            # The `evolve` extension is required for `uncommit` and `prune`.
    395            self.raise_for_missing_extension("evolve")
    396            raise
    397 
    398    def get_last_modified_time_for_file(self, path: Path):
    399        """Return last modified in VCS time for the specified file."""
    400        out = self._run(
    401            "log",
    402            "--template",
    403            "{date|isodatesec}",
    404            "--limit",
    405            "1",
    406            "--follow",
    407            str(path),
    408        )
    409 
    410        return datetime.strptime(out.strip(), "%Y-%m-%d %H:%M:%S %z")
    411 
    412    def _update_mercurial_repo(self, url, dest: Path, revision):
    413        """Perform a clone/pull + update of a Mercurial repository."""
    414        # Disable common extensions whose older versions may cause `hg`
    415        # invocations to abort.
    416        pull_args = [self._tool]
    417        if dest.exists():
    418            pull_args.extend(["pull", url])
    419            cwd = dest
    420        else:
    421            pull_args.extend(["clone", "--noupdate", url, str(dest)])
    422            cwd = "/"
    423 
    424        update_args = [self._tool, "update", "-r", revision]
    425 
    426        print("=" * 80)
    427        print(f"Ensuring {url} is up to date at {dest}")
    428 
    429        env = os.environ.copy()
    430        env.update({
    431            "HGPLAIN": "1",
    432            "HGRCPATH": "!",
    433        })
    434 
    435        try:
    436            subprocess.check_call(pull_args, cwd=str(cwd), env=env)
    437            subprocess.check_call(update_args, cwd=str(dest), env=env)
    438        finally:
    439            print("=" * 80)
    440 
    441    def _update_vct(self, root_state_dir: Path):
    442        """Ensure version-control-tools in the state directory is up to date."""
    443        vct_dir = root_state_dir / "version-control-tools"
    444 
    445        # Ensure the latest revision of version-control-tools is present.
    446        self._update_mercurial_repo(
    447            "https://hg.mozilla.org/hgcustom/version-control-tools", vct_dir, "@"
    448        )
    449 
    450        return vct_dir
    451 
    452    def configure(self, state_dir: Path, update_only: bool = False):
    453        """Run the Mercurial configuration wizard."""
    454        vct_dir = self._update_vct(state_dir)
    455 
    456        # Run the config wizard from v-c-t.
    457        args = [
    458            self._tool,
    459            "--config",
    460            f"extensions.configwizard={vct_dir}/hgext/configwizard",
    461            "configwizard",
    462        ]
    463        if update_only:
    464            args += ["--config", "configwizard.steps="]
    465        subprocess.call(args)