plugin.py (17022B)
1 """pytest-asyncio implementation.""" 2 import asyncio 3 import contextlib 4 import enum 5 import functools 6 import inspect 7 import socket 8 import sys 9 import warnings 10 from typing import ( 11 Any, 12 AsyncIterator, 13 Awaitable, 14 Callable, 15 Dict, 16 Iterable, 17 Iterator, 18 List, 19 Optional, 20 Set, 21 TypeVar, 22 Union, 23 cast, 24 overload, 25 ) 26 27 import pytest 28 29 if sys.version_info >= (3, 8): 30 from typing import Literal 31 else: 32 from typing_extensions import Literal 33 34 _R = TypeVar("_R") 35 36 _ScopeName = Literal["session", "package", "module", "class", "function"] 37 _T = TypeVar("_T") 38 39 SimpleFixtureFunction = TypeVar( 40 "SimpleFixtureFunction", bound=Callable[..., Awaitable[_R]] 41 ) 42 FactoryFixtureFunction = TypeVar( 43 "FactoryFixtureFunction", bound=Callable[..., AsyncIterator[_R]] 44 ) 45 FixtureFunction = Union[SimpleFixtureFunction, FactoryFixtureFunction] 46 FixtureFunctionMarker = Callable[[FixtureFunction], FixtureFunction] 47 48 Config = Any # pytest < 7.0 49 PytestPluginManager = Any # pytest < 7.0 50 FixtureDef = Any # pytest < 7.0 51 Parser = Any # pytest < 7.0 52 SubRequest = Any # pytest < 7.0 53 54 55 class Mode(str, enum.Enum): 56 AUTO = "auto" 57 STRICT = "strict" 58 LEGACY = "legacy" 59 60 61 LEGACY_MODE = DeprecationWarning( 62 "The 'asyncio_mode' default value will change to 'strict' in future, " 63 "please explicitly use 'asyncio_mode=strict' or 'asyncio_mode=auto' " 64 "in pytest configuration file." 65 ) 66 67 LEGACY_ASYNCIO_FIXTURE = ( 68 "'@pytest.fixture' is applied to {name} " 69 "in 'legacy' mode, " 70 "please replace it with '@pytest_asyncio.fixture' as a preparation " 71 "for switching to 'strict' mode (or use 'auto' mode to seamlessly handle " 72 "all these fixtures as asyncio-driven)." 73 ) 74 75 76 ASYNCIO_MODE_HELP = """\ 77 'auto' - for automatically handling all async functions by the plugin 78 'strict' - for autoprocessing disabling (useful if different async frameworks \ 79 should be tested together, e.g. \ 80 both pytest-asyncio and pytest-trio are used in the same project) 81 'legacy' - for keeping compatibility with pytest-asyncio<0.17: \ 82 auto-handling is disabled but pytest_asyncio.fixture usage is not enforced 83 """ 84 85 86 def pytest_addoption(parser: Parser, pluginmanager: PytestPluginManager) -> None: 87 group = parser.getgroup("asyncio") 88 group.addoption( 89 "--asyncio-mode", 90 dest="asyncio_mode", 91 default=None, 92 metavar="MODE", 93 help=ASYNCIO_MODE_HELP, 94 ) 95 parser.addini( 96 "asyncio_mode", 97 help="default value for --asyncio-mode", 98 default="strict", 99 ) 100 101 102 @overload 103 def fixture( 104 fixture_function: FixtureFunction, 105 *, 106 scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ..., 107 params: Optional[Iterable[object]] = ..., 108 autouse: bool = ..., 109 ids: Optional[ 110 Union[ 111 Iterable[Union[None, str, float, int, bool]], 112 Callable[[Any], Optional[object]], 113 ] 114 ] = ..., 115 name: Optional[str] = ..., 116 ) -> FixtureFunction: 117 ... 118 119 120 @overload 121 def fixture( 122 fixture_function: None = ..., 123 *, 124 scope: "Union[_ScopeName, Callable[[str, Config], _ScopeName]]" = ..., 125 params: Optional[Iterable[object]] = ..., 126 autouse: bool = ..., 127 ids: Optional[ 128 Union[ 129 Iterable[Union[None, str, float, int, bool]], 130 Callable[[Any], Optional[object]], 131 ] 132 ] = ..., 133 name: Optional[str] = None, 134 ) -> FixtureFunctionMarker: 135 ... 136 137 138 def fixture( 139 fixture_function: Optional[FixtureFunction] = None, **kwargs: Any 140 ) -> Union[FixtureFunction, FixtureFunctionMarker]: 141 if fixture_function is not None: 142 _set_explicit_asyncio_mark(fixture_function) 143 return pytest.fixture(fixture_function, **kwargs) 144 145 else: 146 147 @functools.wraps(fixture) 148 def inner(fixture_function: FixtureFunction) -> FixtureFunction: 149 return fixture(fixture_function, **kwargs) 150 151 return inner 152 153 154 def _has_explicit_asyncio_mark(obj: Any) -> bool: 155 obj = getattr(obj, "__func__", obj) # instance method maybe? 156 return getattr(obj, "_force_asyncio_fixture", False) 157 158 159 def _set_explicit_asyncio_mark(obj: Any) -> None: 160 if hasattr(obj, "__func__"): 161 # instance method, check the function object 162 obj = obj.__func__ 163 obj._force_asyncio_fixture = True 164 165 166 def _is_coroutine(obj: Any) -> bool: 167 """Check to see if an object is really an asyncio coroutine.""" 168 return asyncio.iscoroutinefunction(obj) 169 170 171 def _is_coroutine_or_asyncgen(obj: Any) -> bool: 172 return _is_coroutine(obj) or inspect.isasyncgenfunction(obj) 173 174 175 def _get_asyncio_mode(config: Config) -> Mode: 176 val = config.getoption("asyncio_mode") 177 if val is None: 178 val = config.getini("asyncio_mode") 179 return Mode(val) 180 181 182 def pytest_configure(config: Config) -> None: 183 """Inject documentation.""" 184 config.addinivalue_line( 185 "markers", 186 "asyncio: " 187 "mark the test as a coroutine, it will be " 188 "run using an asyncio event loop", 189 ) 190 if _get_asyncio_mode(config) == Mode.LEGACY: 191 config.issue_config_time_warning(LEGACY_MODE, stacklevel=2) 192 193 194 @pytest.mark.tryfirst 195 def pytest_report_header(config: Config) -> List[str]: 196 """Add asyncio config to pytest header.""" 197 mode = _get_asyncio_mode(config) 198 return [f"asyncio: mode={mode}"] 199 200 201 def _preprocess_async_fixtures(config: Config, holder: Set[FixtureDef]) -> None: 202 asyncio_mode = _get_asyncio_mode(config) 203 fixturemanager = config.pluginmanager.get_plugin("funcmanage") 204 for fixtures in fixturemanager._arg2fixturedefs.values(): 205 for fixturedef in fixtures: 206 if fixturedef is holder: 207 continue 208 func = fixturedef.func 209 if not _is_coroutine_or_asyncgen(func): 210 # Nothing to do with a regular fixture function 211 continue 212 if not _has_explicit_asyncio_mark(func): 213 if asyncio_mode == Mode.STRICT: 214 # Ignore async fixtures without explicit asyncio mark in strict mode 215 # This applies to pytest_trio fixtures, for example 216 continue 217 elif asyncio_mode == Mode.AUTO: 218 # Enforce asyncio mode if 'auto' 219 _set_explicit_asyncio_mark(func) 220 elif asyncio_mode == Mode.LEGACY: 221 _set_explicit_asyncio_mark(func) 222 try: 223 code = func.__code__ 224 except AttributeError: 225 code = func.__func__.__code__ 226 name = ( 227 f"<fixture {func.__qualname__}, file={code.co_filename}, " 228 f"line={code.co_firstlineno}>" 229 ) 230 warnings.warn( 231 LEGACY_ASYNCIO_FIXTURE.format(name=name), 232 DeprecationWarning, 233 ) 234 235 to_add = [] 236 for name in ("request", "event_loop"): 237 if name not in fixturedef.argnames: 238 to_add.append(name) 239 240 if to_add: 241 fixturedef.argnames += tuple(to_add) 242 243 if inspect.isasyncgenfunction(func): 244 fixturedef.func = _wrap_asyncgen(func) 245 elif inspect.iscoroutinefunction(func): 246 fixturedef.func = _wrap_async(func) 247 248 assert _has_explicit_asyncio_mark(fixturedef.func) 249 holder.add(fixturedef) 250 251 252 def _add_kwargs( 253 func: Callable[..., Any], 254 kwargs: Dict[str, Any], 255 event_loop: asyncio.AbstractEventLoop, 256 request: SubRequest, 257 ) -> Dict[str, Any]: 258 sig = inspect.signature(func) 259 ret = kwargs.copy() 260 if "request" in sig.parameters: 261 ret["request"] = request 262 if "event_loop" in sig.parameters: 263 ret["event_loop"] = event_loop 264 return ret 265 266 267 def _wrap_asyncgen(func: Callable[..., AsyncIterator[_R]]) -> Callable[..., _R]: 268 @functools.wraps(func) 269 def _asyncgen_fixture_wrapper( 270 event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any 271 ) -> _R: 272 gen_obj = func(**_add_kwargs(func, kwargs, event_loop, request)) 273 274 async def setup() -> _R: 275 res = await gen_obj.__anext__() 276 return res 277 278 def finalizer() -> None: 279 """Yield again, to finalize.""" 280 281 async def async_finalizer() -> None: 282 try: 283 await gen_obj.__anext__() 284 except StopAsyncIteration: 285 pass 286 else: 287 msg = "Async generator fixture didn't stop." 288 msg += "Yield only once." 289 raise ValueError(msg) 290 291 event_loop.run_until_complete(async_finalizer()) 292 293 result = event_loop.run_until_complete(setup()) 294 request.addfinalizer(finalizer) 295 return result 296 297 return _asyncgen_fixture_wrapper 298 299 300 def _wrap_async(func: Callable[..., Awaitable[_R]]) -> Callable[..., _R]: 301 @functools.wraps(func) 302 def _async_fixture_wrapper( 303 event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any 304 ) -> _R: 305 async def setup() -> _R: 306 res = await func(**_add_kwargs(func, kwargs, event_loop, request)) 307 return res 308 309 return event_loop.run_until_complete(setup()) 310 311 return _async_fixture_wrapper 312 313 314 _HOLDER: Set[FixtureDef] = set() 315 316 317 @pytest.mark.tryfirst 318 def pytest_pycollect_makeitem( 319 collector: Union[pytest.Module, pytest.Class], name: str, obj: object 320 ) -> Union[ 321 None, pytest.Item, pytest.Collector, List[Union[pytest.Item, pytest.Collector]] 322 ]: 323 """A pytest hook to collect asyncio coroutines.""" 324 if not collector.funcnamefilter(name): 325 return None 326 _preprocess_async_fixtures(collector.config, _HOLDER) 327 if isinstance(obj, staticmethod): 328 # staticmethods need to be unwrapped. 329 obj = obj.__func__ 330 if ( 331 _is_coroutine(obj) 332 or _is_hypothesis_test(obj) 333 and _hypothesis_test_wraps_coroutine(obj) 334 ): 335 item = pytest.Function.from_parent(collector, name=name) 336 marker = item.get_closest_marker("asyncio") 337 if marker is not None: 338 return list(collector._genfunctions(name, obj)) 339 else: 340 if _get_asyncio_mode(item.config) == Mode.AUTO: 341 # implicitly add asyncio marker if asyncio mode is on 342 ret = list(collector._genfunctions(name, obj)) 343 for elem in ret: 344 elem.add_marker("asyncio") 345 return ret # type: ignore[return-value] 346 return None 347 348 349 def _hypothesis_test_wraps_coroutine(function: Any) -> bool: 350 return _is_coroutine(function.hypothesis.inner_test) 351 352 353 @pytest.hookimpl(trylast=True) 354 def pytest_fixture_post_finalizer(fixturedef: FixtureDef, request: SubRequest) -> None: 355 """Called after fixture teardown""" 356 if fixturedef.argname == "event_loop": 357 policy = asyncio.get_event_loop_policy() 358 try: 359 loop = policy.get_event_loop() 360 except RuntimeError: 361 loop = None 362 if loop is not None: 363 # Clean up existing loop to avoid ResourceWarnings 364 loop.close() 365 new_loop = policy.new_event_loop() # Replace existing event loop 366 # Ensure subsequent calls to get_event_loop() succeed 367 policy.set_event_loop(new_loop) 368 369 370 @pytest.hookimpl(hookwrapper=True) 371 def pytest_fixture_setup( 372 fixturedef: FixtureDef, request: SubRequest 373 ) -> Optional[object]: 374 """Adjust the event loop policy when an event loop is produced.""" 375 if fixturedef.argname == "event_loop": 376 outcome = yield 377 loop = outcome.get_result() 378 policy = asyncio.get_event_loop_policy() 379 try: 380 old_loop = policy.get_event_loop() 381 if old_loop is not loop: 382 old_loop.close() 383 except RuntimeError: 384 # Swallow this, since it's probably bad event loop hygiene. 385 pass 386 policy.set_event_loop(loop) 387 return 388 389 yield 390 391 392 @pytest.hookimpl(tryfirst=True, hookwrapper=True) 393 def pytest_pyfunc_call(pyfuncitem: pytest.Function) -> Optional[object]: 394 """ 395 Pytest hook called before a test case is run. 396 397 Wraps marked tests in a synchronous function 398 where the wrapped test coroutine is executed in an event loop. 399 """ 400 marker = pyfuncitem.get_closest_marker("asyncio") 401 if marker is not None: 402 funcargs: Dict[str, object] = pyfuncitem.funcargs # type: ignore[name-defined] 403 loop = cast(asyncio.AbstractEventLoop, funcargs["event_loop"]) 404 if _is_hypothesis_test(pyfuncitem.obj): 405 pyfuncitem.obj.hypothesis.inner_test = wrap_in_sync( 406 pyfuncitem, 407 pyfuncitem.obj.hypothesis.inner_test, 408 _loop=loop, 409 ) 410 else: 411 pyfuncitem.obj = wrap_in_sync( 412 pyfuncitem, 413 pyfuncitem.obj, 414 _loop=loop, 415 ) 416 yield 417 418 419 def _is_hypothesis_test(function: Any) -> bool: 420 return getattr(function, "is_hypothesis_test", False) 421 422 423 def wrap_in_sync( 424 pyfuncitem: pytest.Function, 425 func: Callable[..., Awaitable[Any]], 426 _loop: asyncio.AbstractEventLoop, 427 ): 428 """Return a sync wrapper around an async function executing it in the 429 current event loop.""" 430 431 # if the function is already wrapped, we rewrap using the original one 432 # not using __wrapped__ because the original function may already be 433 # a wrapped one 434 raw_func = getattr(func, "_raw_test_func", None) 435 if raw_func is not None: 436 func = raw_func 437 438 @functools.wraps(func) 439 def inner(*args, **kwargs): 440 coro = func(*args, **kwargs) 441 if not inspect.isawaitable(coro): 442 pyfuncitem.warn( 443 pytest.PytestWarning( 444 f"The test {pyfuncitem} is marked with '@pytest.mark.asyncio' " 445 "but it is not an async function. " 446 "Please remove asyncio marker. " 447 "If the test is not marked explicitly, " 448 "check for global markers applied via 'pytestmark'." 449 ) 450 ) 451 return 452 task = asyncio.ensure_future(coro, loop=_loop) 453 try: 454 _loop.run_until_complete(task) 455 except BaseException: 456 # run_until_complete doesn't get the result from exceptions 457 # that are not subclasses of `Exception`. Consume all 458 # exceptions to prevent asyncio's warning from logging. 459 if task.done() and not task.cancelled(): 460 task.exception() 461 raise 462 463 inner._raw_test_func = func # type: ignore[attr-defined] 464 return inner 465 466 467 def pytest_runtest_setup(item: pytest.Item) -> None: 468 marker = item.get_closest_marker("asyncio") 469 if marker is None: 470 return 471 fixturenames = item.fixturenames # type: ignore[attr-defined] 472 # inject an event loop fixture for all async tests 473 if "event_loop" in fixturenames: 474 fixturenames.remove("event_loop") 475 fixturenames.insert(0, "event_loop") 476 obj = getattr(item, "obj", None) 477 if not getattr(obj, "hypothesis", False) and getattr( 478 obj, "is_hypothesis_test", False 479 ): 480 pytest.fail( 481 "test function `%r` is using Hypothesis, but pytest-asyncio " 482 "only works with Hypothesis 3.64.0 or later." % item 483 ) 484 485 486 @pytest.fixture 487 def event_loop(request: "pytest.FixtureRequest") -> Iterator[asyncio.AbstractEventLoop]: 488 """Create an instance of the default event loop for each test case.""" 489 loop = asyncio.get_event_loop_policy().new_event_loop() 490 yield loop 491 loop.close() 492 493 494 def _unused_port(socket_type: int) -> int: 495 """Find an unused localhost port from 1024-65535 and return it.""" 496 with contextlib.closing(socket.socket(type=socket_type)) as sock: 497 sock.bind(("127.0.0.1", 0)) 498 return sock.getsockname()[1] 499 500 501 @pytest.fixture 502 def unused_tcp_port() -> int: 503 return _unused_port(socket.SOCK_STREAM) 504 505 506 @pytest.fixture 507 def unused_udp_port() -> int: 508 return _unused_port(socket.SOCK_DGRAM) 509 510 511 @pytest.fixture(scope="session") 512 def unused_tcp_port_factory() -> Callable[[], int]: 513 """A factory function, producing different unused TCP ports.""" 514 produced = set() 515 516 def factory(): 517 """Return an unused port.""" 518 port = _unused_port(socket.SOCK_STREAM) 519 520 while port in produced: 521 port = _unused_port(socket.SOCK_STREAM) 522 523 produced.add(port) 524 525 return port 526 527 return factory 528 529 530 @pytest.fixture(scope="session") 531 def unused_udp_port_factory() -> Callable[[], int]: 532 """A factory function, producing different unused UDP ports.""" 533 produced = set() 534 535 def factory(): 536 """Return an unused port.""" 537 port = _unused_port(socket.SOCK_DGRAM) 538 539 while port in produced: 540 port = _unused_port(socket.SOCK_DGRAM) 541 542 produced.add(port) 543 544 return port 545 546 return factory