tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

plugin.py (17022B)


      1 """pytest-asyncio implementation."""
      2 import asyncio
      3 import contextlib
      4 import enum
      5 import functools
      6 import inspect
      7 import socket
      8 import sys
      9 import warnings
     10 from typing import (
     11    Any,
     12    AsyncIterator,
     13    Awaitable,
     14    Callable,
     15    Dict,
     16    Iterable,
     17    Iterator,
     18    List,
     19    Optional,
     20    Set,
     21    TypeVar,
     22    Union,
     23    cast,
     24    overload,
     25 )
     26 
     27 import pytest
     28 
     29 if sys.version_info >= (3, 8):
     30    from typing import Literal
     31 else:
     32    from typing_extensions import Literal
     33 
     34 _R = TypeVar("_R")
     35 
     36 _ScopeName = Literal["session", "package", "module", "class", "function"]
     37 _T = TypeVar("_T")
     38 
     39 SimpleFixtureFunction = TypeVar(
     40    "SimpleFixtureFunction", bound=Callable[..., Awaitable[_R]]
     41 )
     42 FactoryFixtureFunction = TypeVar(
     43    "FactoryFixtureFunction", bound=Callable[..., AsyncIterator[_R]]
     44 )
     45 FixtureFunction = Union[SimpleFixtureFunction, FactoryFixtureFunction]
     46 FixtureFunctionMarker = Callable[[FixtureFunction], FixtureFunction]
     47 
     48 Config = Any  # pytest < 7.0
     49 PytestPluginManager = Any  # pytest < 7.0
     50 FixtureDef = Any  # pytest < 7.0
     51 Parser = Any  # pytest < 7.0
     52 SubRequest = Any  # pytest < 7.0
     53 
     54 
     55 class Mode(str, enum.Enum):
     56    AUTO = "auto"
     57    STRICT = "strict"
     58    LEGACY = "legacy"
     59 
     60 
     61 LEGACY_MODE = DeprecationWarning(
     62    "The 'asyncio_mode' default value will change to 'strict' in future, "
     63    "please explicitly use 'asyncio_mode=strict' or 'asyncio_mode=auto' "
     64    "in pytest configuration file."
     65 )
     66 
     67 LEGACY_ASYNCIO_FIXTURE = (
     68    "'@pytest.fixture' is applied to {name} "
     69    "in 'legacy' mode, "
     70    "please replace it with '@pytest_asyncio.fixture' as a preparation "
     71    "for switching to 'strict' mode (or use 'auto' mode to seamlessly handle "
     72    "all these fixtures as asyncio-driven)."
     73 )
     74 
     75 
     76 ASYNCIO_MODE_HELP = """\
     77 'auto' - for automatically handling all async functions by the plugin
     78 'strict' - for autoprocessing disabling (useful if different async frameworks \
     79 should be tested together, e.g. \
     80 both pytest-asyncio and pytest-trio are used in the same project)
     81 'legacy' - for keeping compatibility with pytest-asyncio<0.17: \
     82 auto-handling is disabled but pytest_asyncio.fixture usage is not enforced
     83 """
     84 
     85 
     86 def pytest_addoption(parser: Parser, pluginmanager: PytestPluginManager) -> None:
     87    group = parser.getgroup("asyncio")
     88    group.addoption(
     89        "--asyncio-mode",
     90        dest="asyncio_mode",
     91        default=None,
     92        metavar="MODE",
     93        help=ASYNCIO_MODE_HELP,
     94    )
     95    parser.addini(
     96        "asyncio_mode",
     97        help="default value for --asyncio-mode",
     98        default="strict",
     99    )
    100 
    101 
    102 @overload
    103 def fixture(
    104    fixture_function: FixtureFunction,
    105    *,
    106    scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ...,
    107    params: Optional[Iterable[object]] = ...,
    108    autouse: bool = ...,
    109    ids: Optional[
    110        Union[
    111            Iterable[Union[None, str, float, int, bool]],
    112            Callable[[Any], Optional[object]],
    113        ]
    114    ] = ...,
    115    name: Optional[str] = ...,
    116 ) -> FixtureFunction:
    117    ...
    118 
    119 
    120 @overload
    121 def fixture(
    122    fixture_function: None = ...,
    123    *,
    124    scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ...,
    125    params: Optional[Iterable[object]] = ...,
    126    autouse: bool = ...,
    127    ids: Optional[
    128        Union[
    129            Iterable[Union[None, str, float, int, bool]],
    130            Callable[[Any], Optional[object]],
    131        ]
    132    ] = ...,
    133    name: Optional[str] = None,
    134 ) -> FixtureFunctionMarker:
    135    ...
    136 
    137 
    138 def fixture(
    139    fixture_function: Optional[FixtureFunction] = None, **kwargs: Any
    140 ) -> Union[FixtureFunction, FixtureFunctionMarker]:
    141    if fixture_function is not None:
    142        _set_explicit_asyncio_mark(fixture_function)
    143        return pytest.fixture(fixture_function, **kwargs)
    144 
    145    else:
    146 
    147        @functools.wraps(fixture)
    148        def inner(fixture_function: FixtureFunction) -> FixtureFunction:
    149            return fixture(fixture_function, **kwargs)
    150 
    151        return inner
    152 
    153 
    154 def _has_explicit_asyncio_mark(obj: Any) -> bool:
    155    obj = getattr(obj, "__func__", obj)  # instance method maybe?
    156    return getattr(obj, "_force_asyncio_fixture", False)
    157 
    158 
    159 def _set_explicit_asyncio_mark(obj: Any) -> None:
    160    if hasattr(obj, "__func__"):
    161        # instance method, check the function object
    162        obj = obj.__func__
    163    obj._force_asyncio_fixture = True
    164 
    165 
    166 def _is_coroutine(obj: Any) -> bool:
    167    """Check to see if an object is really an asyncio coroutine."""
    168    return asyncio.iscoroutinefunction(obj)
    169 
    170 
    171 def _is_coroutine_or_asyncgen(obj: Any) -> bool:
    172    return _is_coroutine(obj) or inspect.isasyncgenfunction(obj)
    173 
    174 
    175 def _get_asyncio_mode(config: Config) -> Mode:
    176    val = config.getoption("asyncio_mode")
    177    if val is None:
    178        val = config.getini("asyncio_mode")
    179    return Mode(val)
    180 
    181 
    182 def pytest_configure(config: Config) -> None:
    183    """Inject documentation."""
    184    config.addinivalue_line(
    185        "markers",
    186        "asyncio: "
    187        "mark the test as a coroutine, it will be "
    188        "run using an asyncio event loop",
    189    )
    190    if _get_asyncio_mode(config) == Mode.LEGACY:
    191        config.issue_config_time_warning(LEGACY_MODE, stacklevel=2)
    192 
    193 
    194 @pytest.mark.tryfirst
    195 def pytest_report_header(config: Config) -> List[str]:
    196    """Add asyncio config to pytest header."""
    197    mode = _get_asyncio_mode(config)
    198    return [f"asyncio: mode={mode}"]
    199 
    200 
    201 def _preprocess_async_fixtures(config: Config, holder: Set[FixtureDef]) -> None:
    202    asyncio_mode = _get_asyncio_mode(config)
    203    fixturemanager = config.pluginmanager.get_plugin("funcmanage")
    204    for fixtures in fixturemanager._arg2fixturedefs.values():
    205        for fixturedef in fixtures:
    206            if fixturedef is holder:
    207                continue
    208            func = fixturedef.func
    209            if not _is_coroutine_or_asyncgen(func):
    210                # Nothing to do with a regular fixture function
    211                continue
    212            if not _has_explicit_asyncio_mark(func):
    213                if asyncio_mode == Mode.STRICT:
    214                    # Ignore async fixtures without explicit asyncio mark in strict mode
    215                    # This applies to pytest_trio fixtures, for example
    216                    continue
    217                elif asyncio_mode == Mode.AUTO:
    218                    # Enforce asyncio mode if 'auto'
    219                    _set_explicit_asyncio_mark(func)
    220                elif asyncio_mode == Mode.LEGACY:
    221                    _set_explicit_asyncio_mark(func)
    222                    try:
    223                        code = func.__code__
    224                    except AttributeError:
    225                        code = func.__func__.__code__
    226                    name = (
    227                        f"<fixture {func.__qualname__}, file={code.co_filename}, "
    228                        f"line={code.co_firstlineno}>"
    229                    )
    230                    warnings.warn(
    231                        LEGACY_ASYNCIO_FIXTURE.format(name=name),
    232                        DeprecationWarning,
    233                    )
    234 
    235            to_add = []
    236            for name in ("request", "event_loop"):
    237                if name not in fixturedef.argnames:
    238                    to_add.append(name)
    239 
    240            if to_add:
    241                fixturedef.argnames += tuple(to_add)
    242 
    243            if inspect.isasyncgenfunction(func):
    244                fixturedef.func = _wrap_asyncgen(func)
    245            elif inspect.iscoroutinefunction(func):
    246                fixturedef.func = _wrap_async(func)
    247 
    248            assert _has_explicit_asyncio_mark(fixturedef.func)
    249            holder.add(fixturedef)
    250 
    251 
    252 def _add_kwargs(
    253    func: Callable[..., Any],
    254    kwargs: Dict[str, Any],
    255    event_loop: asyncio.AbstractEventLoop,
    256    request: SubRequest,
    257 ) -> Dict[str, Any]:
    258    sig = inspect.signature(func)
    259    ret = kwargs.copy()
    260    if "request" in sig.parameters:
    261        ret["request"] = request
    262    if "event_loop" in sig.parameters:
    263        ret["event_loop"] = event_loop
    264    return ret
    265 
    266 
    267 def _wrap_asyncgen(func: Callable[..., AsyncIterator[_R]]) -> Callable[..., _R]:
    268    @functools.wraps(func)
    269    def _asyncgen_fixture_wrapper(
    270        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    271    ) -> _R:
    272        gen_obj = func(**_add_kwargs(func, kwargs, event_loop, request))
    273 
    274        async def setup() -> _R:
    275            res = await gen_obj.__anext__()
    276            return res
    277 
    278        def finalizer() -> None:
    279            """Yield again, to finalize."""
    280 
    281            async def async_finalizer() -> None:
    282                try:
    283                    await gen_obj.__anext__()
    284                except StopAsyncIteration:
    285                    pass
    286                else:
    287                    msg = "Async generator fixture didn't stop."
    288                    msg += "Yield only once."
    289                    raise ValueError(msg)
    290 
    291            event_loop.run_until_complete(async_finalizer())
    292 
    293        result = event_loop.run_until_complete(setup())
    294        request.addfinalizer(finalizer)
    295        return result
    296 
    297    return _asyncgen_fixture_wrapper
    298 
    299 
    300 def _wrap_async(func: Callable[..., Awaitable[_R]]) -> Callable[..., _R]:
    301    @functools.wraps(func)
    302    def _async_fixture_wrapper(
    303        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    304    ) -> _R:
    305        async def setup() -> _R:
    306            res = await func(**_add_kwargs(func, kwargs, event_loop, request))
    307            return res
    308 
    309        return event_loop.run_until_complete(setup())
    310 
    311    return _async_fixture_wrapper
    312 
    313 
    314 _HOLDER: Set[FixtureDef] = set()
    315 
    316 
    317 @pytest.mark.tryfirst
    318 def pytest_pycollect_makeitem(
    319    collector: Union[pytest.Module, pytest.Class], name: str, obj: object
    320 ) -> Union[
    321    None, pytest.Item, pytest.Collector, List[Union[pytest.Item, pytest.Collector]]
    322 ]:
    323    """A pytest hook to collect asyncio coroutines."""
    324    if not collector.funcnamefilter(name):
    325        return None
    326    _preprocess_async_fixtures(collector.config, _HOLDER)
    327    if isinstance(obj, staticmethod):
    328        # staticmethods need to be unwrapped.
    329        obj = obj.__func__
    330    if (
    331        _is_coroutine(obj)
    332        or _is_hypothesis_test(obj)
    333        and _hypothesis_test_wraps_coroutine(obj)
    334    ):
    335        item = pytest.Function.from_parent(collector, name=name)
    336        marker = item.get_closest_marker("asyncio")
    337        if marker is not None:
    338            return list(collector._genfunctions(name, obj))
    339        else:
    340            if _get_asyncio_mode(item.config) == Mode.AUTO:
    341                # implicitly add asyncio marker if asyncio mode is on
    342                ret = list(collector._genfunctions(name, obj))
    343                for elem in ret:
    344                    elem.add_marker("asyncio")
    345                return ret  # type: ignore[return-value]
    346    return None
    347 
    348 
    349 def _hypothesis_test_wraps_coroutine(function: Any) -> bool:
    350    return _is_coroutine(function.hypothesis.inner_test)
    351 
    352 
    353 @pytest.hookimpl(trylast=True)
    354 def pytest_fixture_post_finalizer(fixturedef: FixtureDef, request: SubRequest) -> None:
    355    """Called after fixture teardown"""
    356    if fixturedef.argname == "event_loop":
    357        policy = asyncio.get_event_loop_policy()
    358        try:
    359            loop = policy.get_event_loop()
    360        except RuntimeError:
    361            loop = None
    362        if loop is not None:
    363            # Clean up existing loop to avoid ResourceWarnings
    364            loop.close()
    365        new_loop = policy.new_event_loop()  # Replace existing event loop
    366        # Ensure subsequent calls to get_event_loop() succeed
    367        policy.set_event_loop(new_loop)
    368 
    369 
    370 @pytest.hookimpl(hookwrapper=True)
    371 def pytest_fixture_setup(
    372    fixturedef: FixtureDef, request: SubRequest
    373 ) -> Optional[object]:
    374    """Adjust the event loop policy when an event loop is produced."""
    375    if fixturedef.argname == "event_loop":
    376        outcome = yield
    377        loop = outcome.get_result()
    378        policy = asyncio.get_event_loop_policy()
    379        try:
    380            old_loop = policy.get_event_loop()
    381            if old_loop is not loop:
    382                old_loop.close()
    383        except RuntimeError:
    384            # Swallow this, since it's probably bad event loop hygiene.
    385            pass
    386        policy.set_event_loop(loop)
    387        return
    388 
    389    yield
    390 
    391 
    392 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
    393 def pytest_pyfunc_call(pyfuncitem: pytest.Function) -> Optional[object]:
    394    """
    395    Pytest hook called before a test case is run.
    396 
    397    Wraps marked tests in a synchronous function
    398    where the wrapped test coroutine is executed in an event loop.
    399    """
    400    marker = pyfuncitem.get_closest_marker("asyncio")
    401    if marker is not None:
    402        funcargs: Dict[str, object] = pyfuncitem.funcargs  # type: ignore[name-defined]
    403        loop = cast(asyncio.AbstractEventLoop, funcargs["event_loop"])
    404        if _is_hypothesis_test(pyfuncitem.obj):
    405            pyfuncitem.obj.hypothesis.inner_test = wrap_in_sync(
    406                pyfuncitem,
    407                pyfuncitem.obj.hypothesis.inner_test,
    408                _loop=loop,
    409            )
    410        else:
    411            pyfuncitem.obj = wrap_in_sync(
    412                pyfuncitem,
    413                pyfuncitem.obj,
    414                _loop=loop,
    415            )
    416    yield
    417 
    418 
    419 def _is_hypothesis_test(function: Any) -> bool:
    420    return getattr(function, "is_hypothesis_test", False)
    421 
    422 
    423 def wrap_in_sync(
    424    pyfuncitem: pytest.Function,
    425    func: Callable[..., Awaitable[Any]],
    426    _loop: asyncio.AbstractEventLoop,
    427 ):
    428    """Return a sync wrapper around an async function executing it in the
    429    current event loop."""
    430 
    431    # if the function is already wrapped, we rewrap using the original one
    432    # not using __wrapped__ because the original function may already be
    433    # a wrapped one
    434    raw_func = getattr(func, "_raw_test_func", None)
    435    if raw_func is not None:
    436        func = raw_func
    437 
    438    @functools.wraps(func)
    439    def inner(*args, **kwargs):
    440        coro = func(*args, **kwargs)
    441        if not inspect.isawaitable(coro):
    442            pyfuncitem.warn(
    443                pytest.PytestWarning(
    444                    f"The test {pyfuncitem} is marked with '@pytest.mark.asyncio' "
    445                    "but it is not an async function. "
    446                    "Please remove asyncio marker. "
    447                    "If the test is not marked explicitly, "
    448                    "check for global markers applied via 'pytestmark'."
    449                )
    450            )
    451            return
    452        task = asyncio.ensure_future(coro, loop=_loop)
    453        try:
    454            _loop.run_until_complete(task)
    455        except BaseException:
    456            # run_until_complete doesn't get the result from exceptions
    457            # that are not subclasses of `Exception`. Consume all
    458            # exceptions to prevent asyncio's warning from logging.
    459            if task.done() and not task.cancelled():
    460                task.exception()
    461            raise
    462 
    463    inner._raw_test_func = func  # type: ignore[attr-defined]
    464    return inner
    465 
    466 
    467 def pytest_runtest_setup(item: pytest.Item) -> None:
    468    marker = item.get_closest_marker("asyncio")
    469    if marker is None:
    470        return
    471    fixturenames = item.fixturenames  # type: ignore[attr-defined]
    472    # inject an event loop fixture for all async tests
    473    if "event_loop" in fixturenames:
    474        fixturenames.remove("event_loop")
    475    fixturenames.insert(0, "event_loop")
    476    obj = getattr(item, "obj", None)
    477    if not getattr(obj, "hypothesis", False) and getattr(
    478        obj, "is_hypothesis_test", False
    479    ):
    480        pytest.fail(
    481            "test function `%r` is using Hypothesis, but pytest-asyncio "
    482            "only works with Hypothesis 3.64.0 or later." % item
    483        )
    484 
    485 
    486 @pytest.fixture
    487 def event_loop(request: "pytest.FixtureRequest") -> Iterator[asyncio.AbstractEventLoop]:
    488    """Create an instance of the default event loop for each test case."""
    489    loop = asyncio.get_event_loop_policy().new_event_loop()
    490    yield loop
    491    loop.close()
    492 
    493 
    494 def _unused_port(socket_type: int) -> int:
    495    """Find an unused localhost port from 1024-65535 and return it."""
    496    with contextlib.closing(socket.socket(type=socket_type)) as sock:
    497        sock.bind(("127.0.0.1", 0))
    498        return sock.getsockname()[1]
    499 
    500 
    501 @pytest.fixture
    502 def unused_tcp_port() -> int:
    503    return _unused_port(socket.SOCK_STREAM)
    504 
    505 
    506 @pytest.fixture
    507 def unused_udp_port() -> int:
    508    return _unused_port(socket.SOCK_DGRAM)
    509 
    510 
    511 @pytest.fixture(scope="session")
    512 def unused_tcp_port_factory() -> Callable[[], int]:
    513    """A factory function, producing different unused TCP ports."""
    514    produced = set()
    515 
    516    def factory():
    517        """Return an unused port."""
    518        port = _unused_port(socket.SOCK_STREAM)
    519 
    520        while port in produced:
    521            port = _unused_port(socket.SOCK_STREAM)
    522 
    523        produced.add(port)
    524 
    525        return port
    526 
    527    return factory
    528 
    529 
    530 @pytest.fixture(scope="session")
    531 def unused_udp_port_factory() -> Callable[[], int]:
    532    """A factory function, producing different unused UDP ports."""
    533    produced = set()
    534 
    535    def factory():
    536        """Return an unused port."""
    537        port = _unused_port(socket.SOCK_DGRAM)
    538 
    539        while port in produced:
    540            port = _unused_port(socket.SOCK_DGRAM)
    541 
    542        produced.add(port)
    543 
    544        return port
    545 
    546    return factory