commit c501de0b97c9ce2b0b56e67a95d1f7234a945cd8
parent 546ba1cffeb0287aedf480e98e81a0b21f517653
Author: Henry Wilkes <henry@torproject.org>
Date: Wed, 31 May 2023 16:04:01 +0100
BB 41803: Add some developer tools for working on tor-browser.
Diffstat:
3 files changed, 1822 insertions(+), 0 deletions(-)
diff --git a/tools/base_browser/git-rebase-fixup-preprocessor b/tools/base_browser/git-rebase-fixup-preprocessor
@@ -0,0 +1,95 @@
+#!/usr/bin/python
+"""
+Pre-process a git todo file before passing it on to an editor.
+"""
+
+import sys
+import os
+import subprocess
+import re
+
+EDITOR_ENV_NAME = "GIT_REBASE_FIXUP_PREPROCESSOR_USER_EDITOR"
+
+try:
+ editor = os.environ[EDITOR_ENV_NAME]
+except KeyError:
+ print(f"Missing {EDITOR_ENV_NAME} in environment", file=sys.stderr)
+ exit(1)
+
+if len(sys.argv) < 2:
+ print("Missing filename argument", file=sys.stderr)
+ exit(1)
+
+filename = sys.argv[1]
+
+
+class TodoLine:
+ """
+ Represents a line in the git todo file.
+ """
+
+ # git 2.50 adds a '#' between the commit hash and the commit subject.
+ # Keep this '#' optional for previous git versions.
+ _PICK_REGEX = re.compile(r"^pick [a-f0-9]+ +(?:# +)?(?P<fixups>(fixup! )*)(?P<title>.*)")
+
+ def __init__(self, line):
+ """
+ Create a new line with the given text content.
+ """
+ self._line = line
+ self._make_fixup = False
+
+ match = self._PICK_REGEX.match(line)
+ if match:
+ self._is_pick = True
+ self._num_fixups = len(match.group("fixups")) / len("fixup! ")
+ self._title = match.group("title")
+ else:
+ self._is_pick = False
+ self._num_fixups = False
+ self._title = None
+
+ def add_to_list_try_fixup(self, existing_lines):
+ """
+ Add the TodoLine to the given list of other TodoLine, trying to fix up
+ one of the existing lines.
+ """
+ if not self._num_fixups: # Not a fixup line.
+ existing_lines.append(self)
+ return
+
+ # Search from the end of the list upwards.
+ for index in reversed(range(len(existing_lines))):
+ other = existing_lines[index]
+ if (
+ other._is_pick
+ and self._num_fixups == other._num_fixups + 1
+ and other._title == self._title
+ ):
+ self._make_fixup = True
+ existing_lines.insert(index + 1, self)
+ return
+
+ # No line found to fixup.
+ existing_lines.append(self)
+
+ def get_text(self):
+ """
+ Get the text for the line to save.
+ """
+ line = self._line
+ if self._make_fixup:
+ line = line.replace("pick", "fixup", 1)
+ return line
+
+
+todo_lines = []
+with open(filename, "r", encoding="utf8") as todo_file:
+ for line in todo_file:
+ TodoLine(line).add_to_list_try_fixup(todo_lines)
+
+with open(filename, "w", encoding="utf8") as todo_file:
+ for line in todo_lines:
+ todo_file.write(line.get_text())
+
+exit(subprocess.run([editor, *sys.argv[1:]], check=False).returncode)
diff --git a/tools/base_browser/missing-css-variables.py b/tools/base_browser/missing-css-variables.py
@@ -0,0 +1,88 @@
+"""
+Simple tool for checking for missing CSS declarations.
+
+Should be run from the root directory and passed a single CSS file path to
+check the variables for.
+
+Missing variables will be printed to stdout, if any. Variables are considered
+missing if they are not declared in the same file or in one of the expected
+CSS files. CSS variables that are declared in javascript an unexpected CSS file
+will not be found.
+
+Exits with 0 if no variables are missing. Otherwise exits with 1.
+"""
+
+import re
+import sys
+from pathlib import Path
+
+declare_dirs = [
+ Path("browser/branding"),
+ Path("browser/themes"),
+ Path("toolkit/themes"),
+]
+
+var_dec_regex = re.compile(r"^\s*(?P<name>--[\w_-]+)\s*:")
+var_use_regex = re.compile(r":.*(?P<name>--[\w_-]+)")
+
+
+def remove_vars_in_file(var_set: set[str], file_path: Path) -> bool:
+ """
+ Checks the CSS file for declarations of the given variables and removes
+ them.
+
+ :param var_set: The set of CSS variables to check and remove.
+ :param file_path: The path to a CSS file to check within.
+ :returns: Whether the variable set is now empty.
+ """
+ with file_path.open() as file:
+ for line in file:
+ var_dec_match = var_dec_regex.match(line)
+ if not var_dec_match:
+ continue
+ var_name = var_dec_match.group("name")
+ if var_name in var_set:
+ print(f"{var_name} declared in {file_path}", file=sys.stderr)
+ var_set.remove(var_name)
+ if not var_set:
+ return True
+ return False
+
+
+def find_missing(file_path: Path) -> set[str]:
+ """
+ Search for CSS variables in the CSS file and check whether any are missing
+ known declarations.
+
+ :param file_path: The path of the CSS file to check.
+ :returns: The names of the missing variables.
+ """
+ used_vars: set[str] = set()
+
+ with open(file_path) as file:
+ for line in file:
+ for match in var_use_regex.finditer(line):
+ used_vars.add(match.group("name"))
+
+ if not used_vars:
+ print("No CSS variables found", file=sys.stderr)
+ return used_vars
+
+ # Remove any CSS variables that are declared within the same file.
+ if remove_vars_in_file(used_vars, file_path):
+ return used_vars
+
+ # And remove any that are in the expected declaration files.
+ for top_dir in declare_dirs:
+ for css_file_path in top_dir.rglob("*.css"):
+ if remove_vars_in_file(used_vars, css_file_path):
+ return used_vars
+
+ return used_vars
+
+
+missing_vars = find_missing(Path(sys.argv[1]))
+for var_name in missing_vars:
+ print(var_name)
+
+sys.exit(1 if missing_vars else 0)
diff --git a/tools/base_browser/tb-dev b/tools/base_browser/tb-dev
@@ -0,0 +1,1639 @@
+#!/usr/bin/env python3
+# PYTHON_ARGCOMPLETE_OK
+"""
+Useful tools for working on tor-browser repository.
+"""
+
+import argparse
+import atexit
+import functools
+import json
+import os
+import re
+import subprocess
+import sys
+import tempfile
+import termios
+import urllib.request
+from collections.abc import Callable, Iterable, Iterator
+from types import ModuleType
+from typing import Any, NotRequired, TypedDict, TypeVar
+
+argcomplete: None | ModuleType = None
+try:
+ import argcomplete
+except ImportError:
+ pass
+
+GIT_PATH = "/usr/bin/git"
+UPSTREAM_URLS = {
+ "tor-browser": [
+ "https://gitlab.torproject.org/tpo/applications/tor-browser.git",
+ "git@gitlab.torproject.org:tpo/applications/tor-browser.git",
+ ],
+ "mullvad-browser": [
+ "https://gitlab.torproject.org/tpo/applications/mullvad-browser.git",
+ "git@gitlab.torproject.org:tpo/applications/mullvad-browser.git",
+ ],
+}
+FIXUP_PREPROCESSOR_EDITOR = "git-rebase-fixup-preprocessor"
+USER_EDITOR_ENV_NAME = "GIT_REBASE_FIXUP_PREPROCESSOR_USER_EDITOR"
+
+
+class TbDevException(Exception):
+ pass
+
+
+def git_run(
+ args: list[str], check: bool = True, env: None | dict[str, str] = None
+) -> None:
+ """
+ Run a git command with output sent to stdout.
+ :param args: The arguments to pass to git.
+ :param check: Whether to check for success.
+ :param env: Optional environment to set.
+ """
+ if env is not None:
+ tmp_env = dict(os.environ)
+ for key, value in env.items():
+ tmp_env[key] = value
+ env = tmp_env
+ try:
+ subprocess.run([GIT_PATH, *args], check=check, env=env)
+ except subprocess.CalledProcessError as err:
+ raise TbDevException(str(err)) from err
+
+
+def git_run_pager(
+ args: list[str] | None = None,
+ arg_sequence: Iterable[list[str]] | None = None,
+ pager_prefix: None | str = None,
+) -> None:
+ """
+ Run a sequence of git commands with the output concatenated and sent to the
+ git pager.
+ :param args: The arguments to pass to git, or `None` if a sequence is desired.
+ :param arg_sequence: A sequence representing several git commands.
+ :param pager_prefix: An optional text to send to the pager first.
+ """
+ if arg_sequence is None:
+ if args is not None:
+ arg_sequence = (args,)
+ else:
+ raise ValueError("Missing `arg_sequence` or `args`")
+ elif args is not None:
+ raise ValueError("Unexpected both args and arg_sequence")
+
+ pager = git_get(["var", "GIT_PAGER"])
+ if not pager:
+ raise TbDevException("Missing a GIT_PAGER")
+ command = [pager]
+ if os.path.basename(pager) == "less":
+ # Show colours.
+ command.append("-R")
+
+ pager_process = subprocess.Popen(command, stdin=subprocess.PIPE, text=True)
+ assert pager_process.stdin is not None
+
+ if pager_prefix is not None:
+ pager_process.stdin.write(pager_prefix)
+ pager_process.stdin.flush()
+
+ for git_args in arg_sequence:
+ subprocess.run(
+ [GIT_PATH, "--no-pager", *git_args], check=False, stdout=pager_process.stdin
+ )
+
+ pager_process.stdin.close()
+
+ status = pager_process.wait()
+ if status != 0:
+ raise TbDevException(f"git pager {pager} exited with status {status}")
+
+
+def git_get(args: list[str], strip: bool = True, check: bool = True) -> str:
+ """
+ Return the output from a git command.
+ :param args: The arguments to send to git.
+ :param strip: Whether to strip the whitespace from the output.
+ :param check: Whether to check for success.
+ :returns: The stdout.
+ """
+ try:
+ git_process = subprocess.run(
+ [GIT_PATH, *args], text=True, stdout=subprocess.PIPE, check=check
+ )
+ except subprocess.CalledProcessError as err:
+ raise TbDevException(str(err)) from err
+ ret = git_process.stdout
+ if strip:
+ ret = ret.strip()
+ return ret
+
+
+def git_lines(args: list[str]) -> Iterator[str]:
+ """
+ Yields the non-empty lines returned by the git command.
+ :param args: The arguments to send to git.
+ :yield: The lines.
+ """
+ for line in git_get(args, strip=False).split("\n"):
+ if not line:
+ continue
+ yield line
+
+
+def git_path_args(path_iter: Iterable[str]) -> Iterator[str]:
+ """
+ Generate the trailing arguments to specify paths in git commands, includes
+ the "--" separator just before the paths.
+ :param path_iter: The paths that should be passed in.
+ :yields: The git arguments.
+ """
+ yield "--"
+ for path in path_iter:
+ yield f":(literal){path}"
+
+
+@functools.cache
+def get_local_root() -> str:
+ """
+ Get the path for the tor-browser root directory.
+ :returns: The local root.
+ """
+ try:
+ # Make sure we have a matching remote in this git repository.
+ if get_upstream_details()["is-browser-repo"] == "True":
+ return git_get(["rev-parse", "--show-toplevel"])
+ else:
+ return ""
+ except TbDevException:
+ return ""
+
+
+@functools.cache
+def get_upstream_details() -> dict[str, str]:
+ """
+ Get details about the upstream repository.
+ :returns: The details.
+ """
+ remote_urls = {
+ remote: git_get(["remote", "get-url", remote])
+ for remote in git_lines(["remote"])
+ }
+
+ matches = {
+ remote: repo
+ for repo, url_list in UPSTREAM_URLS.items()
+ for url in url_list
+ for remote, fetch_url in remote_urls.items()
+ if fetch_url == url
+ }
+
+ is_browser_repo = len(matches) > 0
+ details = {"is-browser-repo": str(is_browser_repo)}
+
+ origin_remote_repo = matches.get("origin", None)
+ upstream_remote_repo = matches.get("upstream", None)
+
+ if origin_remote_repo is not None:
+ if upstream_remote_repo is None:
+ details["remote"] = "origin"
+ details["repo-name"] = origin_remote_repo
+ # Else, both "upstream" and "origin" point to a remote repo. Not clear
+ # which should be used.
+ elif upstream_remote_repo is not None:
+ details["remote"] = "upstream"
+ details["repo-name"] = upstream_remote_repo
+ elif len(matches) == 1:
+ remote = next(iter(matches.keys()))
+ details["remote"] = remote
+ details["repo-name"] = matches[remote]
+ # Else, the upstream is ambiguous.
+
+ return details
+
+
+class Reference:
+ """Represents a git reference to a commit."""
+
+ _REFS_REGEX = re.compile(r"refs/[a-z]+/")
+
+ def __init__(self, full_name: str, commit: str) -> None:
+ """
+ :param full_name: The full reference name. E.g. "refs/tags/MyTag".
+ :param commit: The commit hash for the commit this reference points to.
+ """
+ match = self.__class__._REFS_REGEX.match(full_name)
+ if not match:
+ raise ValueError(f"Invalid reference name {full_name}")
+ self.full_name = full_name
+ self.name = full_name[match.end() :]
+ self.commit = commit
+
+
+def get_refs(ref_type: str, name_start: str) -> Iterator[Reference]:
+ """
+ Get a list of references that match the given conditions.
+ :param ref_type: The ref type to search for ("tag" or "remote" or "head").
+ :param name_start: The ref name start to match against.
+ :yield: The matching references.
+ """
+ if ref_type == "tag":
+ ref_start = "refs/tags/"
+ elif ref_type == "remote":
+ ref_start = "refs/remotes/"
+ elif ref_type == "head":
+ ref_start = "refs/heads/"
+ else:
+ raise TypeError(f"Unknown type {ref_type}")
+
+ fstring = "%(*objectname),%(objectname),%(refname)"
+ pattern = f"{ref_start}{name_start}**"
+
+ def line_to_ref(line: str) -> Reference:
+ [objectname_reference, objectname, ref_name] = line.split(",", 2)
+ # For annotated tags, the objectname_reference is non-empty and points
+ # to an actual commit.
+ # For remotes, heads and lightweight tags, the objectname_reference will
+ # be empty and objectname will point directly to the commit.
+ return Reference(ref_name, objectname_reference or objectname)
+
+ return (
+ line_to_ref(line)
+ for line in git_lines(["for-each-ref", f"--format={fstring}", pattern])
+ )
+
+
+def get_firefox_ref(search_from: str) -> Reference:
+ """
+ Search for the commit that comes from firefox.
+ :param search_from: The commit to search backwards from.
+ :returns: The firefox reference.
+ """
+ # Only search a limited history that should include the FIREFOX_ tag.
+ search_commits = [c for c in git_lines(["rev-list", "-1000", search_from])]
+
+ firefox_tag_prefix = "FIREFOX_"
+
+ existing_tags = list(get_refs("tag", firefox_tag_prefix))
+ for commit in search_commits:
+ for ref in existing_tags:
+ if commit == ref.commit:
+ return ref
+
+ # Might just need to fetch tags from the remote.
+ upstream = get_upstream_details().get("remote", None)
+ if upstream:
+ remote_ref: None | Reference = None
+ search_index = len(search_commits)
+ # Search the remote for a tag that is in our history.
+ # We want to avoid triggering a long fetch, so we just want to grab the
+ # tag that already points to a commit in our history.
+ for line in git_lines(
+ ["ls-remote", upstream, f"refs/tags/{firefox_tag_prefix}*"]
+ ):
+ objectname, name = line.split("\t", 1)
+ for index in range(search_index):
+ if search_commits[index] == objectname:
+ # Remove trailing "^{}" for commits pointed to by
+ # annotated tags.
+ remote_ref = Reference(re.sub(r"\^\{\}$", "", name), objectname)
+ # Only continue to search for references that are even
+ # closer to `search_from`.
+ search_index = index
+ break
+ if remote_ref is not None:
+ # Get a local copy of just this tag.
+ git_run(["fetch", "--no-tags", upstream, "tag", remote_ref.name])
+ return ref
+
+ raise TbDevException("Unable to find FIREFOX_ tag")
+
+
+def get_upstream_tracking_branch(search_from: str) -> str:
+ """
+ :param search_from: The commit reference.
+ :returns: The upstream branch reference name.
+ """
+ return git_get(["rev-parse", "--abbrev-ref", f"{search_from}@{{upstream}}"])
+
+
+def get_upstream_basis_commit(search_from: str) -> str:
+ """
+ Get the first common ancestor of search_from that is also in its upstream
+ branch.
+ :param search_from: The commit reference.
+ :returns: The upstream commit hash.
+ """
+ upstream_branch = get_upstream_tracking_branch(search_from)
+ commit = git_get(["merge-base", search_from, upstream_branch])
+ # Verify that the upstream commit shares the same firefox basis. Otherwise,
+ # this would indicate that the upstream is on an early or later FIREFOX
+ # base.
+ upstream_firefox = get_firefox_ref(upstream_branch).commit
+ search_firefox = get_firefox_ref(search_from).commit
+ if upstream_firefox != search_firefox:
+ raise TbDevException(
+ f"Upstream of {search_from} has a different FIREFOX base. "
+ "You might want to set the upstream tracking branch to a new value."
+ )
+ return commit
+
+
+class FileChange:
+ """Represents a git change to a commit."""
+
+ def __init__(self, status: str, path: str, new_path: str) -> None:
+ """
+ :param status: The file change status used within git diff. E.g. "M" for
+ modified, or "D" for deleted.
+ :param path: The source file path.
+ :param new_path: The file path after the change.
+ """
+ self.status = status
+ self.path = path
+ self.new_path = new_path
+
+
+RAW_DIFF_PATH_PATTERN = r"(?P<path>[^\0]*)\0"
+RAW_DIFF_LINE_REGEX = re.compile(
+ r":[0-7]+ [0-7]+ [0-9a-f]+ [0-9a-f]+ (?P<status>[ADMTUXRC])[0-9]*\0"
+ + RAW_DIFF_PATH_PATTERN
+)
+RAW_DIFF_PATH_REGEX = re.compile(RAW_DIFF_PATH_PATTERN)
+
+
+def parse_raw_diff_line(raw_output: str) -> tuple[FileChange, int]:
+ """
+ Parse the --raw diff output from git.
+ :param raw_output: The raw output.
+ :returns: The change for this line, and the offset for the end of the raw
+ diff line.
+ """
+ match = RAW_DIFF_LINE_REGEX.match(raw_output)
+ if not match:
+ raise ValueError(f"Invalid raw output: {raw_output[:50]}...")
+ path = os.path.relpath(os.path.join(get_local_root(), match.group("path")))
+ status = match.group("status")
+ if status in ("R", "C"):
+ match = RAW_DIFF_PATH_REGEX.match(raw_output, pos=match.end())
+ if not match:
+ raise ValueError(f"Invalid raw output for rename: {raw_output[:50]}...")
+ new_path = os.path.relpath(os.path.join(get_local_root(), match.group("path")))
+ else:
+ new_path = path
+
+ return FileChange(status, path, new_path), match.end()
+
+
+def get_changed_files(
+ from_commit: None | str = None, staged: bool = False
+) -> Iterator[FileChange]:
+ """
+ Get a list of file changes relative to the current working directory that have
+ been changed since 'from_commit' (non-inclusive).
+ :param from_commit: The commit to compare against, otherwise use the git
+ diff default.
+ :param staged: Whether to limit the diff to staged changes.
+ :yield: The file changes.
+ """
+ args = ["diff", "-z", "--raw"]
+ if staged:
+ args.append("--staged")
+ if from_commit:
+ args.append(from_commit)
+ raw_output = git_get(args, strip=False)
+ while raw_output:
+ file_change, end = parse_raw_diff_line(raw_output)
+ yield file_change
+ raw_output = raw_output[end:]
+
+
+def file_contains(filename: str, regex: re.Pattern[str]) -> bool:
+ """
+ Return whether the file is a utf-8 text file containing the regular
+ expression given by 'regex'.
+ :param filename: The file path.
+ :param regex: The pattern to search for.
+ :returns: Whether the pattern was matched.
+ """
+ with open(filename, encoding="utf-8") as file:
+ try:
+ for line in file:
+ if regex.search(line):
+ return True
+ except UnicodeDecodeError:
+ # Not a text file
+ pass
+ return False
+
+
+def get_gitlab_default() -> str:
+ """
+ Get the name of the default branch on gitlab.
+ :returns: The branch name.
+ """
+ repo_name = get_upstream_details().get("repo-name", None)
+ if repo_name is None:
+ raise TbDevException("Cannot determine the repository name")
+ query = f"""
+ query {{
+ project(fullPath: "tpo/applications/{repo_name}") {{
+ repository {{ rootRef }}
+ }}
+ }}
+ """
+ request_data = {"query": re.sub(r"\s+", "", query)}
+ gitlab_request = urllib.request.Request(
+ "https://gitlab.torproject.org/api/graphql",
+ headers={
+ "Content-Type": "application/json",
+ "User-Agent": "",
+ },
+ data=json.dumps(request_data).encode("ascii"),
+ )
+
+ with urllib.request.urlopen(gitlab_request, timeout=20) as response:
+ default = json.load(response)["data"]["project"]["repository"]["rootRef"]
+ assert isinstance(default, str)
+ return default
+
+
+def within_browser_root() -> bool:
+ """
+ :returns: Whether we are with the tor browser root.
+ """
+ root = get_local_root()
+ if not root:
+ return False
+ return os.path.commonpath([os.getcwd(), root]) == root
+
+
+# * -------------------- *
+# | Methods for commands |
+# * -------------------- *
+
+
+def show_firefox_commit(_args: argparse.Namespace) -> None:
+ """
+ Print the tag name and commit for the last firefox commit below the current
+ HEAD.
+ """
+ ref = get_firefox_ref("HEAD")
+ print(ref.full_name)
+ print(ref.commit)
+
+
+def show_upstream_basis_commit(_args: argparse.Namespace) -> None:
+ """
+ Print the last upstream commit for the current HEAD.
+ """
+ print(get_upstream_basis_commit("HEAD"))
+
+
+def show_log(args: argparse.Namespace) -> None:
+ """
+ Show the git log between the current HEAD and the last firefox commit.
+ """
+ commit = get_firefox_ref("HEAD").commit
+ git_run(["log", f"{commit}..HEAD", *args.gitargs], check=False)
+
+
+def show_files_containing(args: argparse.Namespace) -> None:
+ """
+ List all the files that that have been modified for tor browser, that also
+ contain a regular expression.
+ """
+ try:
+ regex = re.compile(args.regex)
+ except re.error as err:
+ raise TbDevException(f"{args.regex} is not a valid python regex") from err
+
+ for file_change in get_changed_files(get_firefox_ref("HEAD").commit):
+ path = file_change.new_path
+ if not os.path.isfile(path):
+ # deleted ofile
+ continue
+ if file_contains(path, regex):
+ print(path)
+
+
+def show_changed_files(_args: argparse.Namespace) -> None:
+ """
+ List all the files that have been modified relative to upstream.
+ """
+ for file_change in get_changed_files(get_upstream_basis_commit("HEAD")):
+ print(file_change.new_path)
+
+
+def lint_changed_files(args: argparse.Namespace) -> None:
+ """
+ Lint all the files that have been modified relative to upstream.
+ """
+ os.chdir(get_local_root())
+ file_list = [
+ f.new_path
+ for f in get_changed_files(get_upstream_basis_commit("HEAD"))
+ if os.path.isfile(f.new_path) # Not deleted
+ ]
+ # We add --warnings since clang only reports whitespace issues as warnings.
+ subprocess.run(
+ ["./mach", "lint", "--warnings", "soft", *args.lintargs, *file_list],
+ check=False,
+ )
+
+
+# TODO: replace with "prompt_user[T](..., T]) -> T" after python 3.12 is the
+# minimum mach version.
+T = TypeVar("T")
+
+
+def prompt_user(prompt: str, convert: Callable[[str], T]) -> T:
+ """
+ Ask the user for some input.
+ :param prompt: The prompt to show the user.
+ :param convert: A method to convert the response into a type. Should
+ throw `ValueError` if the user should be re-prompted for a valid input.
+ :returns: The first valid user response.
+ """
+ while True:
+ # Flush out stdin.
+ termios.tcflush(sys.stdin, termios.TCIFLUSH)
+ print(prompt, end="")
+ sys.stdout.flush()
+ try:
+ return convert(sys.stdin.readline().strip())
+ except ValueError:
+ # Continue to prompt.
+ pass
+
+
+def binary_reply_default_no(value: str) -> bool:
+ """
+ Process a 'y' or 'n' reply, defaulting to 'n' if empty.
+ :param value: The user input.
+ :returns: Whether the answer is yes.
+ """
+ if value == "":
+ return False
+ if value.lower() == "y":
+ return True
+ if value.lower() == "n":
+ return False
+ raise ValueError()
+
+
+class FixupTarget:
+ """Represents a commit that can be targeted by a fixup."""
+
+ def __init__(self, commit: str, short_ref: str, title: str) -> None:
+ """
+ :param commit: The commit hash for the commit.
+ :param short_ref: The shortened commit hash for display.
+ :param title: The first line of the commit message.
+ """
+ self.commit = commit
+ self.short_ref = short_ref
+ self.title = title
+ self.changes: list[FileChange] = []
+ self.fixups: list[FixupTarget] = []
+ self.target: None | FixupTarget = None
+
+ _FIXUP_REGEX = re.compile(r"^fixup! +")
+
+ def trim_fixup(self) -> tuple[str, int]:
+ """
+ Trim the "fixup!" prefixes.
+ :returns: The stripped commit title and the fixup depth (how many fixups
+ prefixes there were).
+ """
+ title = self.title
+ depth = 0
+ while True:
+ match = self.__class__._FIXUP_REGEX.match(title)
+ if not match:
+ return title, depth
+ title = title[match.end() :]
+ depth += 1
+
+ def touches_path(
+ self, path: str, filter_status: None | str = None, check_dir: bool = False
+ ) -> bool:
+ """
+ Whether this target, or one of its fixups or target, touches the given
+ path.
+ :param path: The path to check.
+ :param filter_status: Limit the detected changes to the given status(es).
+ :param check_dir: Whether we should treat `path` as a directory and check for
+ files within it.
+ :returns: Whether this target matches.
+ """
+ # NOTE: In the case of renames, we generally assume that renames occur
+ # in the fixup targets. E.g. "Commit 1" creates the file "file.txt", and
+ # "fixup! Commit 1" renames it to "new.txt". In this case, if the
+ # FixupTarget for "Commit 1" is passed in "file.txt" it will match. And
+ # if it is passed in "new.txt" it will also match via the self.fixups
+ # field, which will include the "fixup! Commit 1" rename.
+ # But the "fixup ! Commit 1" FixupTargets will only match with
+ # "file.txt" if they occurred before the rename fixup, and will only
+ # match with "new.txt" if they occur after the rename fixup. With the
+ # exception of the rename fixup itself, which will match both.
+ #
+ # In principle, we could identify a file across renames (have a mapping
+ # from each commit to what the file is called at that stage) and match
+ # using this file identifier. Similar to the "--follow" git diff
+ # argument. This would then cover cases where a rename occurs between
+ # the commit and its fixups, and allow fixups before the rename to also
+ # match. However, the former case is unexpected and the latter case
+ # would not be that useful.
+ if self._touches_path_basis(path, filter_status, check_dir):
+ return True
+ # Mark this as a valid target for the path if one of our fixups changes
+ # this path.
+ # NOTE: We use _touch_path_basis to prevent recursion. This means we
+ # will only check one layer up or down, but we only expect fixups of
+ # up to depth 1.
+ for fixup_target in self.fixups:
+ if fixup_target._touches_path_basis(path, filter_status, check_dir):
+ return True
+ # Mark this as a valid target if our target changes this path.
+ if self.target is not None and self.target._touches_path_basis(
+ path, filter_status, check_dir
+ ):
+ return True
+ return False
+
+ def _touches_path_basis(
+ self, path: str, filter_status: None | str, check_dir: bool
+ ) -> bool:
+ """
+ Whether this target touches the given path.
+ :param path: The path to check.
+ :param filter_status: Limit the detected changes to the given status.
+ :param check_dir: Whether we should treat `path` as a directory and check for
+ files within it.
+ :returns: Whether this target matches.
+ """
+ for file_change in self.changes:
+ if filter_status is not None and file_change.status not in filter_status:
+ continue
+ for test_path in (file_change.path, file_change.new_path):
+ if check_dir:
+ if os.path.commonpath((os.path.dirname(test_path), path)) == path:
+ # test_path's directory matches the path or is within it.
+ return True
+ elif test_path == path:
+ return True
+ return False
+
+
+def get_fixup_targets(
+ target_list: list[FixupTarget],
+ from_commit: str,
+ to_commit: str,
+ fixup_depth: int = 0,
+) -> None:
+ """
+ Find all the commits that can be targeted by a fixup between the given
+ commits.
+ :param target_list: The list to fill with targets. Appended in the order of
+ `from_commit` to `to_commit`.
+ :param from_commit: The commit to start from (non-inclusive).
+ :param to_commit: The commit to end on (inclusive).
+ :param fixup_depth: The maximum "depth" of fixups. I.e. how many "fixup!"
+ prefixes to allow.
+ """
+ raw_output = git_get(
+ [
+ "log",
+ "--pretty=format:%H,%h,%s",
+ "--reverse",
+ "--raw",
+ "-z",
+ f"{from_commit}..{to_commit}",
+ ],
+ strip=False,
+ )
+ pretty_regex = re.compile(
+ r"(?P<commit>[0-9a-f]+),(?P<short_ref>[0-9a-f]+),(?P<title>[^\n\0]*)\n"
+ )
+ excluded_regex_list = [
+ re.compile(r"^Bug [0-9]+.*r="), # Backported Mozilla bug.
+ re.compile(r"^dropme! "),
+ ]
+
+ while raw_output:
+ match = pretty_regex.match(raw_output)
+ if not match:
+ raise ValueError(f"Invalid pretty format: {raw_output[:100]}...")
+ fixup_target = FixupTarget(
+ match.group("commit"), match.group("short_ref"), match.group("title")
+ )
+ raw_output = raw_output[match.end() :]
+ while raw_output and raw_output[0] != "\0":
+ file_change, end = parse_raw_diff_line(raw_output)
+ fixup_target.changes.append(file_change)
+ raw_output = raw_output[end:]
+ if raw_output:
+ # Skip over the "\0".
+ raw_output = raw_output[1:]
+
+ for regex in excluded_regex_list:
+ if regex.match(fixup_target.title):
+ # Exclude from the list.
+ continue
+
+ trimmed_title, depth = fixup_target.trim_fixup()
+ if depth:
+ original_target = None
+ for target in target_list:
+ if target.title == trimmed_title:
+ original_target = target
+ break
+
+ if original_target:
+ original_target.fixups.append(fixup_target)
+ fixup_target.target = original_target
+ if depth > fixup_depth:
+ # Exclude from the list.
+ continue
+
+ target_list.append(fixup_target)
+
+
+class NewCommitBasis:
+ def __init__(self) -> None:
+ self.staged_paths: set[str] = set()
+ self.adding_paths: set[str] = set()
+
+ def add(self, paths: Iterable[str], staged: bool) -> None:
+ """
+ Add a path to include in this commit.
+ :param paths: The paths to add.
+ :param staged: Whether we are adding already staged changes.
+ """
+ if staged:
+ self.staged_paths.update(paths)
+ return
+
+ self.adding_paths.update(paths)
+
+
+class NewCommit(NewCommitBasis):
+ """Represents a new commit that we want to create."""
+
+ def __init__(self, alias: str) -> None:
+ """
+ :param alias: The alias name for the commit.
+ """
+ super().__init__()
+ self.alias = alias
+
+
+class NewFixup(NewCommitBasis):
+ """Represents a new fixup commit that we want to create."""
+
+ def __init__(self, target: FixupTarget) -> None:
+ """
+ :param target: The commit to target with the fixup.
+ """
+ super().__init__()
+ self.target = target
+
+
+def get_suggested_fixup_targets_for_change(
+ file_change: FileChange,
+ fixup_target_list: list[FixupTarget],
+ firefox_directories_lazy: Callable[[], set[str]],
+) -> Iterator[FixupTarget]:
+ """
+ Find the suggested fixup targets for the given file change.
+ :param file_change: The file change to get a suggestion for.
+ :param fixup_target_list: The list to choose from.
+ :param firefox_directories_lazy: Lazy method to return the firefox
+ directories.
+ :yield: The suggested fixup targets.
+ """
+
+ def filter_list(
+ path: str, filter_status: None | str = None, check_dir: bool = False
+ ) -> Iterator[FixupTarget]:
+ return (
+ t
+ for t in fixup_target_list
+ if t.touches_path(path, filter_status=filter_status, check_dir=check_dir)
+ )
+
+ if file_change.status == "D":
+ # Deleted.
+ # Find the commit that introduced this file or previously deleted it.
+ # I.e. added the file ("A"), renamed it ("R"), or deleted it ("D").
+ yield from filter_list(file_change.path, filter_status="ARD")
+ return
+
+ if file_change.status == "A":
+ # First check to see if this file name was actually touched before.
+ yielded_target = False
+ for target in filter_list(file_change.path):
+ yielded_target = True
+ yield target
+ if yielded_target:
+ return
+ # Else, find commits that introduced files in the same directory, or
+ # deleted in them, if they are not firefox directories.
+ dir_path = file_change.path
+ while True:
+ dir_path = os.path.dirname(dir_path)
+ if not dir_path or dir_path in firefox_directories_lazy():
+ return
+
+ yielded_target = False
+ for target in filter_list(dir_path, filter_status="ARD", check_dir=True):
+ yielded_target = True
+ yield target
+
+ if yielded_target:
+ return
+ # Else, search one directory higher.
+
+ if file_change.status == "R":
+ # Renamed.
+ # Find the commit that introduced the original name for this file.
+ yield from filter_list(file_change.path, filter_status="AR")
+ return
+
+ # Modified.
+ yield from filter_list(file_change.path)
+
+
+def ask_for_target(
+ file_change_list: list[FileChange],
+ new_commits_list: list[NewCommit | NewFixup],
+ suggested_fixup_target_list: list[FixupTarget],
+ full_fixup_target_list: list[FixupTarget],
+ staged: bool = False,
+) -> bool:
+ """
+ Ask the user to choose a target.
+ :param file_change_list: The file changes to ask for.
+ :param new_commits_list: The list of pending new commits, may be added to.
+ :param suggested_fixup_target_list: The list of suggested target fixups
+ to choose from.
+ :param staged: Whether this is for staged changes.
+ :returns: `True` if the operation should be aborted.
+ """
+
+ new_paths = [c.new_path for c in file_change_list]
+ all_paths = set(new_paths).union(c.path for c in file_change_list)
+ non_fixup_commits: list[NewCommit] = [
+ n for n in new_commits_list if isinstance(n, NewCommit)
+ ]
+
+ shown_list: list[NewCommit | FixupTarget] = (
+ non_fixup_commits + suggested_fixup_target_list
+ )
+
+ can_skip = not staged
+ shown_full = False
+
+ index_offset = 2
+
+ def valid_response(val: str) -> tuple[str, None | NewCommit | FixupTarget]:
+ val = val.strip()
+
+ if val == "h":
+ return "help", None
+
+ if val == "a":
+ return "abort", None
+
+ if val == "d":
+ return "diff", None
+
+ if val == "f":
+ if shown_full:
+ # Already done once.
+ raise ValueError()
+ return "full-list", None
+
+ is_patch_full = val.startswith("P")
+ is_patch = val.startswith("p")
+ if is_patch or is_patch_full:
+ index = int(val[1:], base=10) # Raises ValueError if not integer.
+ else:
+ index = int(val, base=10) # Raises ValueError if not integer.
+ if index == 0:
+ if not can_skip:
+ raise ValueError()
+ return "skip", None
+
+ if index == 1:
+ return "new", None
+
+ index -= index_offset
+
+ if index < 0 or index >= len(shown_list):
+ raise ValueError()
+
+ selected = shown_list[index]
+
+ if is_patch_full:
+ return "patch-full", selected
+ if is_patch:
+ return "patch", selected
+ return "target", selected
+
+ def alias_response(val: str) -> str:
+ # Choose a default alias name if none is given.
+ val = val.strip() or f"New commit {len(non_fixup_commits)}"
+ for new_commit in non_fixup_commits:
+ if new_commit.alias == val:
+ # Already in use.
+ raise ValueError()
+ return val
+
+ def print_index_option(index: int, description: str) -> None:
+ print(f" \x1b[1m{index}\x1b[0m: {description}")
+
+ def in_pink(text: str) -> str:
+ return f"\x1b[1;38;5;212m{text}\x1b[0m"
+
+ prefix_str = "For " + (in_pink("staged") if staged else "unstaged") + " changes to"
+ if len(new_paths) == 1:
+ print(f"{prefix_str} {in_pink(new_paths[0])}:")
+ else:
+ print(f"{prefix_str}:")
+ for path in new_paths:
+ print(f" {in_pink(path)}")
+ print("")
+
+ show_help = True
+ reshow_list = True
+ while True:
+ if reshow_list:
+ if can_skip:
+ print_index_option(0, "Skip")
+ print_index_option(1, "New commit")
+ for index, target in enumerate(shown_list, start=index_offset):
+ if isinstance(target, NewCommit):
+ print_index_option(index, f"Add to new commit: {target.alias}")
+ else:
+ print_index_option(
+ index, f"Fixup: {in_pink(target.short_ref)} {target.title}"
+ )
+ reshow_list = False
+ print("")
+
+ response, selected = prompt_user(
+ (
+ "Choose an <index> to target. Type 'h' for additional options: "
+ if show_help
+ else "Choose an <index> to target or an option: "
+ ),
+ valid_response,
+ )
+
+ if response == "help":
+ print("Options:")
+ for option, desc in (
+ ("h", "show the available options."),
+ ("a", "abort this commit operation and all pending commits."),
+ (
+ ("", "")
+ if shown_full
+ else (
+ "f",
+ "show the full list of fixup targets, rather than just the suggested ones.",
+ )
+ ),
+ ("d", "view the diff for the pending file changes."),
+ (
+ "P<index>",
+ "view the patch for the index (including its relevant fixups).",
+ ),
+ (
+ "p<index>",
+ "view the patch for the index (including its relevant fixups), "
+ "limited to the current files.",
+ ),
+ ):
+ if not option:
+ # Skip this option.
+ continue
+ print(f" \x1b[1m{option[0]}\x1b[0m{option[1:].ljust(7)}: {desc}")
+ # Do not show the help option again.
+ show_help = False
+ continue
+
+ if response == "abort":
+ return True
+
+ if response == "skip":
+ return False
+
+ if response == "new":
+ new_alias = prompt_user(
+ "Enter an optional temporary alias for this new commit: ",
+ alias_response,
+ )
+ new_commit = NewCommit(new_alias)
+ new_commit.add(all_paths, staged)
+ new_commits_list.append(new_commit)
+ return False
+
+ if response == "target":
+ assert selected is not None
+
+ if isinstance(selected, NewCommit):
+ # Adding to a new commit.
+ selected.add(all_paths, staged)
+ return False
+
+ for new_fixup in new_commits_list:
+ if not isinstance(new_fixup, NewFixup):
+ continue
+ if new_fixup.target == selected:
+ # We already have a pending fixup commit that targets this
+ # selected target. Add this path to the same commit.
+ new_fixup.add(all_paths, staged)
+ return False
+
+ new_fixup = NewFixup(selected)
+ new_fixup.add(all_paths, staged)
+ new_commits_list.append(new_fixup)
+ return False
+
+ if response == "full-list":
+ shown_list = non_fixup_commits + full_fixup_target_list
+ shown_full = True
+ reshow_list = True
+ continue
+
+ if response == "diff":
+ git_args = ["diff", "--color"]
+ if staged:
+ git_args.append("--staged")
+ git_args.extend(git_path_args(all_paths))
+ git_run_pager(git_args)
+ continue
+
+ if response in ("patch", "patch-full"):
+ assert selected is not None
+
+ filter_paths = response == "patch"
+
+ if isinstance(selected, NewCommit):
+ git_sequence = [
+ ["diff", "--color", "--staged", *git_path_args((path,))]
+ for path in selected.staged_paths
+ if not filter_paths or path in all_paths
+ ]
+ git_sequence.extend(
+ ["diff", "--color", *git_path_args((path,))]
+ for path in selected.adding_paths
+ if not filter_paths or path in all_paths
+ )
+
+ # Show what the expected patch will be for the new commit.
+ git_run_pager(
+ arg_sequence=git_sequence, pager_prefix=f"{selected.alias}\n\n"
+ )
+ else:
+ # Show the log entry for the FixupTarget and each of its fixups.
+ # Order with the commmit closest to HEAD first. We expect
+ # selected.fixups to match this order.
+ git_sequence = []
+ # If `filter_paths` is set, we want to limit the log to the
+ # paths, and try to track any renames in the commit history.
+ prev_log_paths: None | set[str] = None
+ # For the first commit in the sequence, we use the old path
+ # names (rather than `c.new_path`) since we expect the commit
+ # which is closest to us to use the older names.
+ log_paths: None | set[str] = (
+ {c.path for c in file_change_list} if filter_paths else None
+ )
+ for target in (*selected.fixups, selected):
+ git_args = [
+ "log",
+ "--color",
+ "-p",
+ f"{target.commit}~1..{target.commit}",
+ ]
+ if filter_paths:
+ assert log_paths is not None
+ # Track the renamed paths.
+ prev_log_paths = log_paths.copy()
+ for file_change in target.changes:
+ if (
+ file_change.status == "R"
+ and file_change.new_path in log_paths
+ ):
+ # file was renamed in this change.
+ # Update log_paths to the new name.
+ # NOTE: This should have a similar effect to the
+ # --follow option for git log for a single file
+ # NOTE: File renames will not be properly
+ # tracked if a rename occurs outside of
+ # `selected.changes` or
+ # `selected.fixups[].changes`, but this is
+ # unexpected.
+ log_paths.remove(file_change.new_path)
+ log_paths.add(file_change.path)
+
+ # NOTE: This log entry may be empty if none of the paths
+ # match.
+ # NOTE: We include both log_paths and prev_log_paths to
+ # show renames in the diff output.
+ git_args.extend(git_path_args(log_paths | prev_log_paths))
+ git_sequence.append(git_args)
+ # Combine all the logs into one.
+ git_run_pager(arg_sequence=git_sequence)
+ continue
+
+ raise ValueError(f"Unexpected response: {response}")
+
+
+def auto_commit(_args: argparse.Namespace) -> None:
+ """
+ Automatically find and fix up commits for any pending changes.
+ """
+ # Want git log and add to be run from the root.
+ os.chdir(get_local_root())
+ # Only want to search as far back as the firefox commit.
+ firefox_commit = get_firefox_ref("HEAD").commit
+
+ staged_changes = [f for f in get_changed_files(staged=True)]
+ if staged_changes:
+ print("Existing staged changes for:")
+ for file_change in staged_changes:
+ print(f" {file_change.new_path}")
+ if not prompt_user(
+ "Include staged changes? (y/\x1b[4mn\x1b[0m)", binary_reply_default_no
+ ):
+ raise TbDevException("Cannot continue with pending staged changes")
+ print("")
+
+ full_target_list: list[FixupTarget] = []
+ # Determine if HEAD points to a branch or not and has an upstream commit.
+ # We choose check=False since the exit status is non-zero when we are in a
+ # detached state.
+ head_symbolic_ref = git_get(["symbolic-ref", "-q", "HEAD"], check=False)
+ if not head_symbolic_ref or not bool(
+ git_get(["for-each-ref", "--format=%(upstream)", head_symbolic_ref])
+ ):
+ # Unexpected, but not fatal.
+ print("HEAD has no upstream tracking!")
+ # Just include all commits since firefox_commit with no fixup depth
+ get_fixup_targets(full_target_list, firefox_commit, "HEAD", fixup_depth=0)
+ else:
+ upstream_commit = get_upstream_basis_commit("HEAD")
+ # Only include "fixup!" commits that are between here and the upstream
+ # tracking commit.
+ get_fixup_targets(
+ full_target_list, firefox_commit, upstream_commit, fixup_depth=0
+ )
+ get_fixup_targets(full_target_list, upstream_commit, "HEAD", fixup_depth=1)
+
+ # full_target_list is ordered with the earlier commits first. Reverse this.
+ full_target_list.reverse()
+ # Also reverse the fixups order to follow the same order.
+ for target in full_target_list:
+ target.fixups.reverse()
+
+ # Lazy load the list of firefox directories since they are unlikely to be
+ # needed.
+ @functools.cache
+ def firefox_directories_lazy() -> set[str]:
+ return {
+ dir_name
+ for dir_name in git_get(
+ [
+ "ls-tree",
+ "-r",
+ "-d",
+ "--name-only",
+ "--full-tree",
+ "-z",
+ firefox_commit,
+ ],
+ strip=False,
+ ).split("\0")
+ if dir_name
+ }
+
+ # Check untracked files to be added.
+ for path in git_get(
+ ["ls-files", "--other", "--exclude-standard", "-z"], strip=False
+ ).split("\0"):
+ if not path:
+ continue
+ if prompt_user(
+ f"Start tracking file `{path}`? (y/\x1b[4mn\x1b[0m)",
+ binary_reply_default_no,
+ ):
+ # Include in the git diff output, but do not stage.
+ git_run(["add", "--intent-to-add", path])
+ print("")
+
+ aborted = False
+ new_commits_list: list[NewCommit | NewFixup] = []
+ # First go through staged changes.
+ if staged_changes:
+ common_fixup_targets = None
+ for change in staged_changes:
+ target_iter = get_suggested_fixup_targets_for_change(
+ change, full_target_list, firefox_directories_lazy
+ )
+ if common_fixup_targets is None:
+ common_fixup_targets = set(target_iter)
+ else:
+ common_fixup_targets.intersection_update(target_iter)
+
+ assert common_fixup_targets is not None
+
+ aborted = ask_for_target(
+ staged_changes,
+ new_commits_list,
+ # Sort in the same order as full_target_list.
+ [target for target in full_target_list if target in common_fixup_targets],
+ full_target_list,
+ staged=True,
+ )
+ print("")
+
+ if not aborted:
+ for file_change in get_changed_files():
+ target_list = list(
+ get_suggested_fixup_targets_for_change(
+ file_change, full_target_list, firefox_directories_lazy
+ )
+ )
+ aborted = ask_for_target(
+ [file_change],
+ new_commits_list,
+ target_list,
+ full_target_list,
+ staged=False,
+ )
+ print("")
+ if aborted:
+ break
+
+ if aborted:
+ return
+
+ # NOTE: Only the first commit can include staged changes.
+ # This should already be the case, but we want to double check.
+ for commit_index in range(1, len(new_commits_list)):
+ if new_commits_list[commit_index].staged_paths:
+ raise ValueError(f"Staged changes for commit {commit_index}")
+
+ for new_commit in new_commits_list:
+ print("")
+ if new_commit.adding_paths:
+ git_run(["add", *git_path_args(new_commit.adding_paths)])
+ if isinstance(new_commit, NewFixup):
+ git_run(["commit", f"--fixup={new_commit.target.commit}"])
+ print("")
+ is_double_fixup = bool(new_commit.target.target)
+ if not is_double_fixup and prompt_user(
+ "Edit fixup commit message? (y/\x1b[4mn\x1b[0m)",
+ binary_reply_default_no,
+ ):
+ git_run(["commit", "--amend"])
+ print("")
+ else:
+ git_run(["commit", "-m", new_commit.alias])
+ git_run(["commit", "--amend"])
+ print("")
+
+
+def clean_fixups(_args: argparse.Namespace) -> None:
+ """
+ Perform an interactive rebase that automatically applies fixups, similar to
+ --autosquash but also works on fixups of fixups.
+ """
+ user_editor = git_get(["var", "GIT_SEQUENCE_EDITOR"])
+ sub_editor = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), FIXUP_PREPROCESSOR_EDITOR
+ )
+
+ git_run(
+ ["rebase", "--interactive"],
+ check=False,
+ env={"GIT_SEQUENCE_EDITOR": sub_editor, USER_EDITOR_ENV_NAME: user_editor},
+ )
+
+
+def show_default(_args: argparse.Namespace) -> None:
+ """
+ Print the default branch name from gitlab.
+ """
+ default_branch = get_gitlab_default()
+ upstream = get_upstream_details().get("remote", None)
+ if upstream is None:
+ raise TbDevException("Cannot determine the upstream remote")
+ print(f"{upstream}/{default_branch}")
+
+
+def branch_from_default(args: argparse.Namespace) -> None:
+ """
+ Fetch the default gitlab branch from upstream and create a new local branch.
+ """
+ default_branch = get_gitlab_default()
+ upstream = get_upstream_details().get("remote", None)
+ if upstream is None:
+ raise TbDevException("Cannot determine the upstream remote")
+
+ git_run(["fetch", upstream, default_branch])
+ git_run(
+ [
+ "switch",
+ "--create",
+ args.branchname,
+ "--track",
+ f"{upstream}/{default_branch}",
+ ]
+ )
+
+
+def move_to_default(args: argparse.Namespace) -> None:
+ """
+ Fetch the default gitlab branch from upstream and move the specified
+ branch's commits on top. A new branch will be created tracking the default
+ branch, and the old branch will be renamed with a suffix for the old
+ tracking branch. This method will switch to the new branch, but will avoid
+ switching to the old branch to prevent triggering a CLOBBER build.
+ """
+ branch_name = args.branch
+ if branch_name is None:
+ # Use current branch as default.
+ try:
+ branch_name = git_get(["branch", "--show-current"])
+ except IndexError:
+ raise TbDevException("No current branch")
+
+ current_upstream_branch = get_upstream_tracking_branch(branch_name)
+ default_branch = get_gitlab_default()
+ upstream = get_upstream_details().get("remote", None)
+ if upstream is None:
+ raise TbDevException("Cannot determine the upstream remote")
+
+ git_run(["fetch", upstream, default_branch])
+
+ new_upstream_branch = f"{upstream}/{default_branch}"
+ if current_upstream_branch == new_upstream_branch:
+ print(
+ f"{branch_name} is already set to track the default branch {new_upstream_branch}."
+ )
+ return
+
+ # We want to avoid checking out the old branch because this can cause
+ # mozilla ./mach to do a CLOBBER build.
+ # Instead we create a new branch with the same name and cherry pick.
+ current_basis = get_upstream_basis_commit(branch_name)
+ old_branch_name = branch_name + "-" + get_firefox_ref(branch_name).name
+
+ print(f"Moving old branch {branch_name} to {old_branch_name}")
+ git_run(["branch", "-m", branch_name, old_branch_name])
+
+ try:
+ git_run(["switch", "--create", branch_name, "--track", new_upstream_branch])
+ except subprocess.CalledProcessError as err:
+ print(f"Moving {old_branch_name} back to {branch_name}")
+ git_run(["branch", "-m", old_branch_name, branch_name])
+ raise err
+
+ # Set check to False since cherry-pick might fail due to a merge conflict.
+ git_run(["cherry-pick", f"{current_basis}..{old_branch_name}"], check=False)
+
+
+def show_range_diff(args: argparse.Namespace) -> None:
+ """
+ Show the range diff between two branches, from their firefox bases.
+ """
+ firefox_commit_1 = get_firefox_ref(args.branch1).commit
+ firefox_commit_2 = get_firefox_ref(args.branch2).commit
+ git_run(
+ [
+ "range-diff",
+ f"{firefox_commit_1}..{args.branch1}",
+ f"{firefox_commit_2}..{args.branch2}",
+ ],
+ check=False,
+ )
+
+
+def show_diff_diff(args: argparse.Namespace) -> None:
+ """
+ Show the diff between the diffs of two branches, relative to their firefox
+ bases.
+ """
+ try:
+ diff_tool = next(git_lines(["config", "--get", "diff.tool"]))
+ except StopIteration:
+ raise TbDevException("No diff.tool configured for git")
+
+ # Filter out parts of the diff we expect to be different.
+ index_regex = re.compile(r"index [0-9a-f]{12}\.\.[0-9a-f]{12}")
+ lines_regex = re.compile(r"@@ -[0-9]+,[0-9]+ \+[0-9]+,[0-9]+ @@(?P<rest>.*)")
+
+ def save_diff(branch: str) -> str:
+ firefox_commit = get_firefox_ref(branch).commit
+ file_desc, file_name = tempfile.mkstemp(
+ text=True, prefix=f'{branch.split("/")[-1]}-'
+ )
+ # Register deleting the file at exit.
+ atexit.register(os.remove, file_name)
+
+ diff_process = subprocess.Popen(
+ [GIT_PATH, "diff", f"{firefox_commit}..{branch}"],
+ stdout=subprocess.PIPE,
+ text=True,
+ )
+
+ with os.fdopen(file_desc, "w") as file:
+ assert diff_process.stdout is not None
+ for line in diff_process.stdout:
+ if index_regex.match(line):
+ # Fake data that will match.
+ file.write("index ????????????..????????????\n")
+ continue
+ lines_match = lines_regex.match(line)
+ if lines_match:
+ # Fake data that will match.
+ file.write("@@ ?,? ?,? @@" + lines_match.group("rest"))
+ continue
+ file.write(line)
+
+ status = diff_process.wait()
+ if status != 0:
+ raise TbDevException(f"git diff exited with status {status}")
+
+ return file_name
+
+ file_1 = save_diff(args.branch1)
+ file_2 = save_diff(args.branch2)
+ subprocess.run([diff_tool, file_1, file_2], check=False)
+
+
+# * -------------------- *
+# | Command line parsing |
+# * -------------------- *
+
+
+def branch_complete(prefix: str, **_kwargs: Any) -> list[str]:
+ """
+ Complete the argument with a branch name.
+ """
+ if not within_browser_root():
+ return []
+ try:
+ branches = [ref.name for ref in get_refs("head", "")]
+ branches.extend(ref.name for ref in get_refs("remote", ""))
+ branches.append("HEAD")
+ except Exception:
+ return []
+ return [br for br in branches if br.startswith(prefix)]
+
+
+parser = argparse.ArgumentParser()
+subparsers = parser.add_subparsers(required=True)
+
+
+class ArgConfig(TypedDict):
+ help: str
+ metavar: NotRequired[str]
+ nargs: NotRequired[str]
+ completer: NotRequired[Callable[[str], list[str]]]
+
+
+class CommandConfig(TypedDict):
+ func: Callable[[argparse.Namespace], None]
+ args: NotRequired[dict[str, ArgConfig]]
+
+
+all_commands: dict[str, CommandConfig] = {
+ "show-upstream-basis-commit": {
+ "func": show_upstream_basis_commit,
+ },
+ "changed-files": {
+ "func": show_changed_files,
+ },
+ "lint-changed-files": {
+ "func": lint_changed_files,
+ "args": {
+ "lintargs": {
+ "help": "argument to pass to ./mach lint",
+ "metavar": "-- lint-arg",
+ "nargs": "*",
+ },
+ },
+ },
+ "auto-commit": {
+ "func": auto_commit,
+ },
+ "clean-fixups": {
+ "func": clean_fixups,
+ },
+ "show-default": {
+ "func": show_default,
+ },
+ "branch-from-default": {
+ "func": branch_from_default,
+ "args": {
+ "branchname": {
+ "help": "the name for the new local branch",
+ "metavar": "<branch-name>",
+ },
+ },
+ },
+ "move-to-default": {
+ "func": move_to_default,
+ "args": {
+ "branch": {
+ "help": "the branch to move, else uses the current branch",
+ "metavar": "<branch>",
+ "nargs": "?",
+ "completer": branch_complete,
+ },
+ },
+ },
+ "show-firefox-commit": {
+ "func": show_firefox_commit,
+ },
+ "log": {
+ "func": show_log,
+ "args": {
+ "gitargs": {
+ "help": "argument to pass to git log",
+ "metavar": "-- git-log-arg",
+ "nargs": "*",
+ },
+ },
+ },
+ "branch-range-diff": {
+ "func": show_range_diff,
+ "args": {
+ "branch1": {
+ "help": "the first branch to compare",
+ "metavar": "<branch-1>",
+ "completer": branch_complete,
+ },
+ "branch2": {
+ "help": "the second branch to compare",
+ "metavar": "<branch-2>",
+ "completer": branch_complete,
+ },
+ },
+ },
+ "branch-diff-diff": {
+ "func": show_diff_diff,
+ "args": {
+ "branch1": {
+ "help": "the first branch to compare",
+ "metavar": "<branch-1>",
+ "completer": branch_complete,
+ },
+ "branch2": {
+ "help": "the second branch to compare",
+ "metavar": "<branch-2>",
+ "completer": branch_complete,
+ },
+ },
+ },
+ "files-containing": {
+ "func": show_files_containing,
+ "args": {
+ "regex": {"help": "the regex that the files must contain"},
+ },
+ },
+}
+
+for name, command_config in all_commands.items():
+ help_message = command_config["func"].__doc__
+ assert isinstance(help_message, str)
+ help_message = re.sub(r"\s+", " ", help_message).strip()
+ sub = subparsers.add_parser(name, help=help_message)
+ sub.set_defaults(func=command_config["func"])
+ for arg, keywords in command_config.get("args", {}).items():
+ completer = None
+ if "completer" in keywords:
+ completer = keywords["completer"]
+ del keywords["completer"]
+ sub_arg = sub.add_argument(arg, **keywords)
+ if completer is not None and argcomplete is not None:
+ sub_arg.completer = completer # type: ignore
+
+if argcomplete is not None:
+ argcomplete.autocomplete(parser)
+
+try:
+ if not within_browser_root():
+ raise TbDevException("Must be within a browser directory")
+ parsed_args = parser.parse_args()
+
+ parsed_args.func(parsed_args)
+except TbDevException as err:
+ print(f"\x1b[1m{err}\x1b[0m", file=sys.stderr)