fmt.py (7530B)
1 import codecs 2 import logging 3 import os 4 import platform 5 import re 6 import shutil 7 import stat 8 import sys 9 from collections.abc import Iterable 10 from datetime import datetime, timedelta 11 from difflib import unified_diff 12 from subprocess import check_call 13 14 from compare_locales.merge import merge_channels 15 from compare_locales.paths.configparser import TOMLParser 16 from compare_locales.paths.files import ProjectFiles 17 from fluent.migrate.repo_client import RepoClient, git 18 from fluent.migrate.validator import Validator 19 from fluent.syntax import FluentParser, FluentSerializer 20 from mach.util import get_state_dir 21 from mozpack.path import join, normpath 22 23 L10N_SOURCE_NAME = "l10n-source" 24 L10N_SOURCE_REPO = "https://github.com/mozilla-l10n/firefox-l10n-source.git" 25 26 PULL_AFTER = timedelta(days=2) 27 28 29 def handle_rmtree_error(func, path, exc_info): 30 """ 31 Custom error handler for shutil.rmtree(). 32 Attempts to change file permissions if a permission error occurs. 33 """ 34 if func == os.unlink and isinstance(exc_info[0], PermissionError): 35 print( 36 f"Permission error encountered for: {path}. Attempting to change permissions." 37 ) 38 try: 39 os.chmod(path, stat.S_IWRITE) # Make the file writable 40 func(path) # Retry the removal 41 except Exception as e: 42 print(f"Failed to remove {path} even after changing permissions: {e}") 43 raise # Re-raise the original exception if retry fails 44 else: 45 raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) 46 47 48 def remove_readonly(func, path, _): 49 "Clear the readonly bit and reattempt the removal" 50 os.chmod(path, stat.S_IWRITE) 51 func(path) 52 53 54 def inspect_migration(path): 55 """Validate recipe and extract some metadata.""" 56 return Validator.validate(path) 57 58 59 def prepare_directories(cmd): 60 """ 61 Ensure object dir exists, 62 and that repo dir has a relatively up-to-date clone of l10n-source or gecko-strings. 63 64 We run this once per mach invocation, for all tested migrations. 65 """ 66 obj_dir = join(cmd.topobjdir, "python", "l10n") 67 if not os.path.exists(obj_dir): 68 os.makedirs(obj_dir) 69 70 repo_dir = join(get_state_dir(), L10N_SOURCE_NAME) 71 marker = join(repo_dir, ".git", "l10n_pull_marker") 72 73 try: 74 last_pull = datetime.fromtimestamp(os.stat(marker).st_mtime) 75 skip_clone = datetime.now() < last_pull + PULL_AFTER 76 except OSError: 77 skip_clone = False 78 if not skip_clone: 79 if os.path.exists(repo_dir): 80 check_call(["git", "pull", L10N_SOURCE_REPO], cwd=repo_dir) 81 else: 82 check_call(["git", "clone", L10N_SOURCE_REPO, repo_dir]) 83 with open(marker, "w") as fh: 84 fh.flush() 85 86 return obj_dir, repo_dir 87 88 89 def diff_resources(left_path, right_path): 90 parser = FluentParser(with_spans=False) 91 serializer = FluentSerializer(with_junk=True) 92 lines = [] 93 for p in (left_path, right_path): 94 with codecs.open(p, encoding="utf-8") as fh: 95 res = parser.parse(fh.read()) 96 lines.append(serializer.serialize(res).splitlines(True)) 97 sys.stdout.writelines( 98 chunk for chunk in unified_diff(lines[0], lines[1], left_path, right_path) 99 ) 100 101 102 def test_migration( 103 cmd, 104 obj_dir: str, 105 repo_dir: str, 106 to_test: list[str], 107 references: Iterable[str], 108 ): 109 """Test the given recipe. 110 111 This creates a workdir by l10n-merging gecko-strings and the m-c source, 112 to mimic gecko-strings after the patch to test landed. 113 It then runs the recipe with a gecko-strings clone as localization, both 114 dry and wet. 115 It inspects the generated commits, and shows a diff between the merged 116 reference and the generated content. 117 The diff is intended to be visually inspected. Some changes might be 118 expected, in particular when formatting of the en-US strings is different. 119 """ 120 rv = 0 121 migration_name = os.path.splitext(os.path.split(to_test)[1])[0] 122 work_dir = join(obj_dir, migration_name) 123 124 paths = os.path.normpath(to_test).split(os.sep) 125 # Migration modules should be in a sub-folder of l10n. 126 migration_module = ( 127 ".".join(paths[paths.index("l10n") + 1 : -1]) + "." + migration_name 128 ) 129 130 if os.path.exists(work_dir): 131 # in python 3.12+ we can use onexc= 132 pyver = platform.python_version() 133 major, minor, _ = pyver.split(".") 134 # 3.12 deprecated onerror and introduced onexc. 135 if int(major) >= 3 and int(minor) >= 12: 136 shutil.rmtree(work_dir, onexc=remove_readonly) 137 else: 138 shutil.rmtree(work_dir, onerror=handle_rmtree_error) 139 140 os.makedirs(join(work_dir, "reference")) 141 l10n_toml = join(cmd.topsrcdir, cmd.substs["MOZ_BUILD_APP"], "locales", "l10n.toml") 142 pc = TOMLParser().parse(l10n_toml, env={"l10n_base": work_dir}) 143 pc.set_locales(["reference"]) 144 files = ProjectFiles("reference", [pc]) 145 ref_root = join(work_dir, "reference") 146 for ref in references: 147 if ref != normpath(ref): 148 cmd.log( 149 logging.ERROR, 150 "fluent-migration-test", 151 {"file": to_test, "ref": ref}, 152 'Reference path "{ref}" needs to be normalized for {file}', 153 ) 154 rv = 1 155 continue 156 full_ref = join(ref_root, ref) 157 m = files.match(full_ref) 158 if m is None: 159 raise ValueError("Bad reference path: " + ref) 160 m_c_path = m[1] 161 g_s_path = join(work_dir, L10N_SOURCE_NAME, ref) 162 resources = [ 163 b"" if not os.path.exists(f) else open(f, "rb").read() 164 for f in (g_s_path, m_c_path) 165 ] 166 ref_dir = os.path.dirname(full_ref) 167 if not os.path.exists(ref_dir): 168 os.makedirs(ref_dir) 169 open(full_ref, "wb").write(merge_channels(ref, resources)) 170 l10n_root = join(work_dir, "en-US") 171 git(work_dir, "clone", repo_dir, l10n_root) 172 client = RepoClient(l10n_root) 173 old_tip = client.head() 174 run_migration = [ 175 cmd._virtualenv_manager.python_path, 176 "-m", 177 "fluent.migrate.tool", 178 "--lang", 179 "en-US", 180 "--reference-dir", 181 ref_root, 182 "--localization-dir", 183 l10n_root, 184 "--dry-run", 185 migration_module, 186 ] 187 cmd.run_process(run_migration, cwd=work_dir, line_handler=print) 188 # drop --dry-run 189 run_migration.pop(-2) 190 cmd.run_process(run_migration, cwd=work_dir, line_handler=print) 191 tip = client.head() 192 if old_tip == tip: 193 cmd.log( 194 logging.WARN, 195 "fluent-migration-test", 196 {"file": to_test}, 197 "No migration applied for {file}", 198 ) 199 return rv 200 for ref in references: 201 diff_resources(join(ref_root, ref), join(l10n_root, ref)) 202 messages = client.log(old_tip, tip) 203 bug = re.search("[0-9]{5,}", migration_name) 204 # Just check first message for bug number, they're all following the same pattern 205 if bug is None or bug.group() not in messages[0]: 206 rv = 1 207 cmd.log( 208 logging.ERROR, 209 "fluent-migration-test", 210 {"file": to_test}, 211 "Missing or wrong bug number for {file}", 212 ) 213 if any(f"part {n + 1}" not in msg for n, msg in enumerate(messages)): 214 rv = 1 215 cmd.log( 216 logging.ERROR, 217 "fluent-migration-test", 218 {"file": to_test}, 219 'Commit messages should have "part {{index}}" for {file}', 220 ) 221 return rv