tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

wpt.py (7978B)


      1 # mypy: allow-untyped-defs
      2 
      3 import argparse
      4 import json
      5 import logging
      6 import multiprocessing
      7 import os
      8 import sys
      9 
     10 from tools import localpaths  # noqa: F401
     11 
     12 from . import virtualenv
     13 
     14 
     15 here = os.path.dirname(__file__)
     16 wpt_root = os.path.abspath(os.path.join(here, os.pardir, os.pardir))
     17 
     18 
     19 def load_conditional_requirements(props, base_dir):
     20    """Load conditional requirements from commands.json."""
     21 
     22    conditional_requirements = props.get("conditional_requirements")
     23    if not conditional_requirements:
     24        return {}
     25 
     26    commandline_flag_requirements = {}
     27    for key, value in conditional_requirements.items():
     28        if key == "commandline_flag":
     29            for flag_name, requirements_paths in value.items():
     30                commandline_flag_requirements[flag_name] = [
     31                    os.path.join(base_dir, path) for path in requirements_paths]
     32        else:
     33            raise KeyError(
     34                f'Unsupported conditional requirement key: {key}')
     35 
     36    return {
     37        "commandline_flag": commandline_flag_requirements,
     38    }
     39 
     40 
     41 def load_commands():
     42    rv = {}
     43    with open(os.path.join(here, "paths")) as f:
     44        paths = [item.strip().replace("/", os.path.sep) for item in f if item.strip()]
     45    for path in paths:
     46        abs_path = os.path.join(wpt_root, path, "commands.json")
     47        base_dir = os.path.dirname(abs_path)
     48        with open(abs_path) as f:
     49            data = json.load(f)
     50            for command, props in data.items():
     51                assert "path" in props
     52                assert "script" in props
     53                rv[command] = {
     54                    "path": os.path.join(base_dir, props["path"]),
     55                    "script": props["script"],
     56                    "parser": props.get("parser"),
     57                    "parse_known": props.get("parse_known", False),
     58                    "help": props.get("help"),
     59                    "virtualenv": props.get("virtualenv", True),
     60                    "requirements": [os.path.join(base_dir, item)
     61                                     for item in props.get("requirements", [])]
     62                }
     63 
     64                rv[command]["conditional_requirements"] = load_conditional_requirements(
     65                    props, base_dir)
     66 
     67                if rv[command]["requirements"] or rv[command]["conditional_requirements"]:
     68                    assert rv[command]["virtualenv"]
     69    return rv
     70 
     71 
     72 def parse_args(argv, commands=load_commands()):
     73    parser = argparse.ArgumentParser()
     74    parser.add_argument("--venv", help="Path to an existing virtualenv to use")
     75    parser.add_argument("--skip-venv-setup", action="store_true",
     76                        help="Whether to use the virtualenv as-is. Must set --venv as well")
     77    parser.add_argument("--debug", action="store_true",
     78                        help="Run the debugger in case of an exception")
     79    subparsers = parser.add_subparsers(dest="command")
     80    for command, props in commands.items():
     81        subparsers.add_parser(command, help=props["help"], add_help=False)
     82 
     83    if not argv:
     84        parser.print_help()
     85        return None, None
     86 
     87    args, extra = parser.parse_known_args(argv)
     88 
     89    return args, extra
     90 
     91 
     92 def import_command(prog, command, props):
     93    # This currently requires the path to be a module,
     94    # which probably isn't ideal but it means that relative
     95    # imports inside the script work
     96    rel_path = os.path.relpath(props["path"], wpt_root)
     97 
     98    parts = os.path.splitext(rel_path)[0].split(os.path.sep)
     99 
    100    mod_name = ".".join(parts)
    101 
    102    mod = __import__(mod_name)
    103    for part in parts[1:]:
    104        mod = getattr(mod, part)
    105 
    106    script = getattr(mod, props["script"])
    107    if props["parser"] is not None:
    108        parser = getattr(mod, props["parser"])()
    109        parser.prog = f"{os.path.basename(prog)} {command}"
    110    else:
    111        parser = None
    112 
    113    return script, parser
    114 
    115 
    116 def create_complete_parser():
    117    """Eagerly load all subparsers. This involves more work than is required
    118    for typical command-line usage. It is maintained for the purposes of
    119    documentation generation as implemented in WPT's top-level `/docs`
    120    directory."""
    121 
    122    commands = load_commands()
    123    parser = argparse.ArgumentParser()
    124    subparsers = parser.add_subparsers()
    125 
    126    # We should already be in a virtual environment from the top-level
    127    # `wpt build-docs` command but we need to look up the environment to
    128    # find out where it's located.
    129    venv_path = os.environ["VIRTUAL_ENV"]
    130    venv = virtualenv.Virtualenv(venv_path, True)
    131 
    132    for command in commands:
    133        props = commands[command]
    134 
    135        try:
    136            venv.install_requirements(*props.get("requirements", []))
    137        except Exception:
    138            logging.warning(
    139                f"Unable to install requirements ({props['requirements']!r}) for command {command}"
    140            )
    141            continue
    142 
    143 
    144        subparser = import_command('wpt', command, props)[1]
    145        if not subparser:
    146            continue
    147 
    148        subparsers.add_parser(command,
    149                              help=props["help"],
    150                              add_help=False,
    151                              parents=[subparser])
    152 
    153    return parser
    154 
    155 
    156 def venv_dir():
    157    return f"_venv{sys.version_info[0]}"
    158 
    159 
    160 def setup_virtualenv(path, skip_venv_setup, props):
    161    if skip_venv_setup and path is None:
    162        raise ValueError("Must set --venv when --skip-venv-setup is used")
    163    should_skip_setup = path is not None and skip_venv_setup
    164    if path is None:
    165        path = os.path.join(wpt_root, venv_dir())
    166    venv = virtualenv.Virtualenv(path, should_skip_setup)
    167    if not should_skip_setup:
    168        venv.start()
    169        venv.install_requirements(*props.get("requirements", []))
    170    return venv
    171 
    172 
    173 def install_command_flag_requirements(venv, props, kwargs):
    174    requirements = props["conditional_requirements"].get("commandline_flag", {})
    175    install_paths = []
    176    for command_flag_name, requirement_paths in requirements.items():
    177        if command_flag_name in kwargs:
    178            install_paths.extend(requirement_paths)
    179    venv.install_requirements(*install_paths)
    180 
    181 
    182 def main(prog=None, argv=None):
    183    logging.basicConfig(level=logging.INFO)
    184    # Ensure we use the spawn start method for all multiprocessing
    185    try:
    186        multiprocessing.set_start_method('spawn')
    187    except RuntimeError as e:
    188        # This can happen if we call back into wpt having already set the context
    189        start_method = multiprocessing.get_start_method()
    190        if start_method != "spawn":
    191            logging.critical("The multiprocessing start method was set to %s by a caller", start_method)
    192            raise e
    193 
    194    if prog is None:
    195        prog = sys.argv[0]
    196    if argv is None:
    197        argv = sys.argv[1:]
    198 
    199    commands = load_commands()
    200 
    201    main_args, command_args = parse_args(argv, commands)
    202 
    203    if not main_args:
    204        return
    205 
    206    command = main_args.command
    207    props = commands[command]
    208    venv = None
    209    if props["virtualenv"]:
    210        venv = setup_virtualenv(main_args.venv, main_args.skip_venv_setup, props)
    211    script, parser = import_command(prog, command, props)
    212    if parser:
    213        if props["parse_known"]:
    214            kwargs, extras = parser.parse_known_args(command_args)
    215            extras = (extras,)
    216            kwargs = vars(kwargs)
    217        else:
    218            extras = ()
    219            kwargs = vars(parser.parse_args(command_args))
    220    else:
    221        extras = ()
    222        kwargs = {}
    223 
    224    if venv is not None:
    225        if not main_args.skip_venv_setup:
    226            install_command_flag_requirements(venv, props, kwargs)
    227        args = (venv,) + extras
    228    else:
    229        args = extras
    230 
    231    if script:
    232        try:
    233            rv = script(*args, **kwargs)
    234            if rv is not None:
    235                sys.exit(int(rv))
    236        except Exception:
    237            if main_args.debug:
    238                import pdb
    239                pdb.post_mortem()
    240            else:
    241                raise
    242    sys.exit(0)
    243 
    244 
    245 if __name__ == "__main__":
    246    main()  # type: ignore