mercurial.py (15668B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this, 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 import errno 6 import os 7 import re 8 import shutil 9 import subprocess 10 from contextlib import contextmanager 11 from datetime import datetime 12 from pathlib import Path 13 from typing import Optional, Union 14 15 from mozpack.files import FileListFinder 16 17 from mozversioncontrol.errors import ( 18 CannotDeleteFromRootOfRepositoryException, 19 MissingVCSExtension, 20 ) 21 from mozversioncontrol.repo.base import Repository 22 23 24 class HgRepository(Repository): 25 """An implementation of `Repository` for Mercurial repositories.""" 26 27 def __init__(self, path: Path, hg="hg"): 28 import hglib.client 29 30 super().__init__(path, tool=hg) 31 self._env["HGPLAIN"] = "1" 32 33 # Setting this modifies a global variable and makes all future hglib 34 # instances use this binary. Since the tool path was validated, this 35 # should be OK. But ideally hglib would offer an API that defines 36 # per-instance binaries. 37 hglib.HGPATH = str(self._tool) 38 39 # Without connect=False this spawns a persistent process. We want 40 # the process lifetime tied to a context manager. 41 self._client = hglib.client.hgclient( 42 self.path, encoding="UTF-8", configs=None, connect=False 43 ) 44 45 @property 46 def name(self): 47 return "hg" 48 49 @property 50 def head_ref(self): 51 return self._run("log", "-r", ".", "-T", "{node}") 52 53 def is_cinnabar_repo(self) -> bool: 54 return False 55 56 @property 57 def base_ref(self): 58 return self._run("log", "-r", "last(ancestors(.) and public())", "-T", "{node}") 59 60 def base_ref_as_hg(self): 61 return self.base_ref 62 63 def base_ref_as_commit(self): 64 raise Exception("unimplemented: convert hg rev to git rev") 65 66 @property 67 def branch(self): 68 bookmarks_fn = Path(self.path) / ".hg" / "bookmarks.current" 69 if bookmarks_fn.exists(): 70 with open(bookmarks_fn) as f: 71 bookmark = f.read() 72 return bookmark or None 73 74 return None 75 76 def __enter__(self): 77 if self._client.server is None: 78 # The cwd if the spawned process should be the repo root to ensure 79 # relative paths are normalized to it. 80 old_cwd = Path.cwd() 81 try: 82 os.chdir(self.path) 83 self._client.open() 84 finally: 85 os.chdir(old_cwd) 86 87 return self 88 89 def __exit__(self, exc_type, exc_val, exc_tb): 90 self._client.close() 91 92 def _run(self, *args, **runargs): 93 if not self._client.server: 94 return super()._run(*args, **runargs) 95 96 # hglib requires bytes on python 3 97 args = [a.encode("utf-8") if not isinstance(a, bytes) else a for a in args] 98 return self._client.rawcommand(args).decode("utf-8") 99 100 def get_commit_time(self): 101 newest_public_revision_time = self._run( 102 "log", 103 "--rev", 104 "heads(ancestors(.) and not draft())", 105 "--template", 106 "{word(0, date|hgdate)}", 107 "--limit", 108 "1", 109 ).strip() 110 111 if not newest_public_revision_time: 112 raise RuntimeError( 113 "Unable to find a non-draft commit in this hg " 114 "repository. If you created this repository from a " 115 'bundle, have you done a "hg pull" from hg.mozilla.org ' 116 "since?" 117 ) 118 119 return int(newest_public_revision_time) 120 121 def sparse_checkout_present(self): 122 # We assume a sparse checkout is enabled if the .hg/sparse file 123 # has data. Strictly speaking, we should look for a requirement in 124 # .hg/requires. But since the requirement is still experimental 125 # as of Mercurial 4.3, it's probably more trouble than its worth 126 # to verify it. 127 sparse = Path(self.path) / ".hg" / "sparse" 128 129 try: 130 st = sparse.stat() 131 return st.st_size > 0 132 except OSError as e: 133 if e.errno != errno.ENOENT: 134 raise 135 136 return False 137 138 def get_user_email(self): 139 # Output is in the form "First Last <flast@mozilla.com>" 140 username = self._run("config", "ui.username", return_codes=[0, 1]) 141 if not username: 142 # No username is set 143 return None 144 match = re.search(r"<(.*)>", username) 145 if not match: 146 # "ui.username" doesn't follow the "Full Name <email@domain>" convention 147 return None 148 return match.group(1) 149 150 def _format_diff_filter(self, diff_filter, for_status=False): 151 df = diff_filter.lower() 152 assert all(f in self._valid_diff_filter for f in df) 153 154 # When looking at the changes in the working directory, the hg status 155 # command uses 'd' for files that have been deleted with a non-hg 156 # command, and 'r' for files that have been `hg rm`ed. Use both. 157 return df.replace("d", "dr") if for_status else df 158 159 def _files_template(self, diff_filter): 160 template = "" 161 df = self._format_diff_filter(diff_filter) 162 if "a" in df: 163 template += "{file_adds % '{file}\\n'}" 164 if "d" in df: 165 template += "{file_dels % '{file}\\n'}" 166 if "m" in df: 167 template += "{file_mods % '{file}\\n'}" 168 return template 169 170 def get_changed_files(self, diff_filter="ADM", mode="unstaged", rev=None): 171 if rev is None: 172 # Use --no-status to print just the filename. 173 df = self._format_diff_filter(diff_filter, for_status=True) 174 return self._run("status", "--no-status", f"-{df}").splitlines() 175 else: 176 template = self._files_template(diff_filter) 177 return self._run("log", "-r", rev, "-T", template).splitlines() 178 179 def get_outgoing_files(self, diff_filter="ADM", upstream=None): 180 template = self._files_template(diff_filter) 181 182 if not upstream: 183 return self._run( 184 "log", "-r", "draft() and ancestors(.)", "--template", template 185 ).split() 186 187 return self._run( 188 "outgoing", 189 "-r", 190 ".", 191 "--quiet", 192 "--template", 193 template, 194 upstream, 195 return_codes=(1,), 196 ).split() 197 198 def add_remove_files(self, *paths: Union[str, Path], force: bool = False): 199 if not paths: 200 return 201 202 paths = [str(path) for path in paths] 203 204 args = ["addremove"] + paths 205 m = re.search(r"\d+\.\d+", self.tool_version) 206 simplified_version = float(m.group(0)) if m else 0 207 if simplified_version >= 3.9: 208 args = ["--config", "extensions.automv="] + args 209 self._run(*args) 210 211 def forget_add_remove_files(self, *paths: Union[str, Path]): 212 if not paths: 213 return 214 215 paths = [str(path) for path in paths] 216 217 self._run("forget", *paths) 218 219 def get_tracked_files_finder(self, path=None): 220 # Can return backslashes on Windows. Normalize to forward slashes. 221 files = list( 222 p.replace("\\", "/") for p in self._run("files", "-0").split("\0") if p 223 ) 224 return FileListFinder(files) 225 226 def get_ignored_files_finder(self): 227 # Can return backslashes on Windows. Normalize to forward slashes. 228 files = list( 229 p.replace("\\", "/").split(" ")[-1] 230 for p in self._run("status", "-i").split("\n") 231 if p 232 ) 233 return FileListFinder(files) 234 235 def diff_stream(self, rev=None, extensions=(), exclude_file=None, context=8): 236 args = ["diff", f"-U{context}"] 237 if rev: 238 args += ["-c", rev] 239 else: 240 args += ["-r", ".^"] 241 for dot_extension in extensions: 242 args += ["--include", f"glob:**{dot_extension}"] 243 if exclude_file is not None: 244 args += ["--exclude", f"listfile:{exclude_file}"] 245 return self._pipefrom(*args) 246 247 def working_directory_clean(self, untracked=False, ignored=False): 248 args = ["status", "--modified", "--added", "--removed", "--deleted"] 249 if untracked: 250 args.append("--unknown") 251 if ignored: 252 args.append("--ignored") 253 254 # If output is empty, there are no entries of requested status, which 255 # means we are clean. 256 return not len(self._run(*args).strip()) 257 258 def clean_directory(self, path: Union[str, Path]): 259 if Path(self.path).samefile(path): 260 raise CannotDeleteFromRootOfRepositoryException() 261 self._run("revert", str(path)) 262 for single_path in self._run("st", "-un", str(path)).splitlines(): 263 single_path = Path(single_path) 264 if single_path.is_file(): 265 single_path.unlink() 266 else: 267 shutil.rmtree(str(single_path)) 268 269 def update(self, ref): 270 return self._run("update", "--check", ref) 271 272 def raise_for_missing_extension(self, extension: str): 273 """Raise `MissingVCSExtension` if `extension` is not installed and enabled.""" 274 try: 275 self._run("showconfig", f"extensions.{extension}") 276 except subprocess.CalledProcessError: 277 raise MissingVCSExtension(extension) 278 279 def push_to_try( 280 self, 281 message: str, 282 changed_files: dict[str, str] = {}, 283 allow_log_capture: bool = False, 284 ): 285 if changed_files: 286 self.stage_changes(changed_files) 287 288 try: 289 cmd = (str(self._tool), "push-to-try", "-m", message) 290 if allow_log_capture: 291 self._push_to_try_with_log_capture( 292 cmd, 293 { 294 "stdout": subprocess.PIPE, 295 "stderr": subprocess.PIPE, 296 "cwd": self.path, 297 "env": self._env, 298 "universal_newlines": True, 299 "bufsize": 1, 300 }, 301 ) 302 else: 303 subprocess.check_call( 304 cmd, 305 cwd=self.path, 306 env=self._env, 307 ) 308 except subprocess.CalledProcessError: 309 self.raise_for_missing_extension("push-to-try") 310 raise 311 finally: 312 self._run("revert", "-a") 313 314 def get_commits( 315 self, 316 head: Optional[str] = None, 317 base_ref: Optional[str] = None, 318 limit: Optional[int] = None, 319 follow: Optional[list[str]] = None, 320 ) -> list[str]: 321 """Return a list of commit SHAs for nodes on the current branch.""" 322 if not base_ref: 323 base_ref = self.base_ref 324 325 head_ref = head or self.head_ref 326 327 cmd = [ 328 "log", 329 "-r", 330 f"{base_ref}::{head_ref} and not {base_ref}", 331 "-T", 332 "{node}\n", 333 ] 334 if limit is not None: 335 cmd.append(f"-l{limit}") 336 if follow is not None: 337 cmd += ["-f", "--", *follow] 338 339 return self._run(*cmd).splitlines() 340 341 def get_commit_patches(self, nodes: list[str]) -> list[bytes]: 342 """Return the contents of the patch `node` in the VCS' standard format.""" 343 # Running `hg export` once for each commit in a large stack is 344 # slow, so instead we run it once and parse the output for each 345 # individual patch. 346 args = ["export"] 347 348 for node in nodes: 349 args.extend(("-r", node)) 350 351 output = self._run(*args, encoding=None) 352 353 patches = [] 354 355 current_patch = [] 356 for i, line in enumerate(output.splitlines(keepends=True)): 357 if i != 0 and line.rstrip() == b"# HG changeset patch": 358 # When we see the first line of a new patch, add the patch we have been 359 # building to the patches list and start building a new patch. 360 patches.append(b"".join(current_patch)) 361 current_patch = [line] 362 else: 363 # Add a new line to the patch being built. 364 current_patch.append(line) 365 366 # Add the last patch to the stack. 367 patches.append(b"".join(current_patch)) 368 369 return patches 370 371 @contextmanager 372 def try_commit( 373 self, commit_message: str, changed_files: Optional[dict[str, str]] = None 374 ): 375 """Create a temporary try commit as a context manager. 376 377 Create a new commit using `commit_message` as the commit message. The commit 378 may be empty, for example when only including try syntax. 379 380 `changed_files` may contain a dict of file paths and their contents, 381 see `stage_changes`. 382 """ 383 if changed_files: 384 self.stage_changes(changed_files) 385 386 # Allow empty commit messages in case we only use try-syntax. 387 self._run("--config", "ui.allowemptycommit=1", "commit", "-m", commit_message) 388 389 yield self.head_ref 390 391 try: 392 self._run("prune", ".") 393 except subprocess.CalledProcessError: 394 # The `evolve` extension is required for `uncommit` and `prune`. 395 self.raise_for_missing_extension("evolve") 396 raise 397 398 def get_last_modified_time_for_file(self, path: Path): 399 """Return last modified in VCS time for the specified file.""" 400 out = self._run( 401 "log", 402 "--template", 403 "{date|isodatesec}", 404 "--limit", 405 "1", 406 "--follow", 407 str(path), 408 ) 409 410 return datetime.strptime(out.strip(), "%Y-%m-%d %H:%M:%S %z") 411 412 def _update_mercurial_repo(self, url, dest: Path, revision): 413 """Perform a clone/pull + update of a Mercurial repository.""" 414 # Disable common extensions whose older versions may cause `hg` 415 # invocations to abort. 416 pull_args = [self._tool] 417 if dest.exists(): 418 pull_args.extend(["pull", url]) 419 cwd = dest 420 else: 421 pull_args.extend(["clone", "--noupdate", url, str(dest)]) 422 cwd = "/" 423 424 update_args = [self._tool, "update", "-r", revision] 425 426 print("=" * 80) 427 print(f"Ensuring {url} is up to date at {dest}") 428 429 env = os.environ.copy() 430 env.update({ 431 "HGPLAIN": "1", 432 "HGRCPATH": "!", 433 }) 434 435 try: 436 subprocess.check_call(pull_args, cwd=str(cwd), env=env) 437 subprocess.check_call(update_args, cwd=str(dest), env=env) 438 finally: 439 print("=" * 80) 440 441 def _update_vct(self, root_state_dir: Path): 442 """Ensure version-control-tools in the state directory is up to date.""" 443 vct_dir = root_state_dir / "version-control-tools" 444 445 # Ensure the latest revision of version-control-tools is present. 446 self._update_mercurial_repo( 447 "https://hg.mozilla.org/hgcustom/version-control-tools", vct_dir, "@" 448 ) 449 450 return vct_dir 451 452 def configure(self, state_dir: Path, update_only: bool = False): 453 """Run the Mercurial configuration wizard.""" 454 vct_dir = self._update_vct(state_dir) 455 456 # Run the config wizard from v-c-t. 457 args = [ 458 self._tool, 459 "--config", 460 f"extensions.configwizard={vct_dir}/hgext/configwizard", 461 "configwizard", 462 ] 463 if update_only: 464 args += ["--config", "configwizard.steps="] 465 subprocess.call(args)