|
6 | 6 | from collections.abc import Callable
|
7 | 7 | from collections.abc import Generator
|
8 | 8 | from collections.abc import Iterable
|
| 9 | +from collections.abc import Iterator |
| 10 | +from enum import auto |
| 11 | +from enum import Enum |
9 | 12 | import inspect
|
10 | 13 | import sys
|
11 | 14 | import traceback
|
12 | 15 | import types
|
13 |
| -from typing import Any |
14 | 16 | from typing import TYPE_CHECKING
|
15 | 17 | from typing import Union
|
16 | 18 |
|
17 | 19 | import _pytest._code
|
18 | 20 | from _pytest.compat import is_async_function
|
19 | 21 | from _pytest.config import hookimpl
|
20 | 22 | from _pytest.fixtures import FixtureRequest
|
| 23 | +from _pytest.monkeypatch import MonkeyPatch |
21 | 24 | from _pytest.nodes import Collector
|
22 | 25 | from _pytest.nodes import Item
|
23 | 26 | from _pytest.outcomes import exit
|
@@ -228,8 +231,7 @@ def startTest(self, testcase: unittest.TestCase) -> None:
|
228 | 231 | pass
|
229 | 232 |
|
230 | 233 | def _addexcinfo(self, rawexcinfo: _SysExcInfoType) -> None:
|
231 |
| - # Unwrap potential exception info (see twisted trial support below). |
232 |
| - rawexcinfo = getattr(rawexcinfo, "_rawexcinfo", rawexcinfo) |
| 234 | + rawexcinfo = _handle_twisted_exc_info(rawexcinfo) |
233 | 235 | try:
|
234 | 236 | excinfo = _pytest._code.ExceptionInfo[BaseException].from_exc_info(
|
235 | 237 | rawexcinfo # type: ignore[arg-type]
|
@@ -373,49 +375,130 @@ def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> None:
|
373 | 375 | pass
|
374 | 376 |
|
375 | 377 |
|
376 |
| -# Twisted trial support. |
377 |
| -classImplements_has_run = False |
| 378 | +def _is_skipped(obj) -> bool: |
| 379 | + """Return True if the given object has been marked with @unittest.skip.""" |
| 380 | + return bool(getattr(obj, "__unittest_skip__", False)) |
| 381 | + |
| 382 | + |
| 383 | +def pytest_configure() -> None: |
| 384 | + """Register the TestCaseFunction class as an IReporter if twisted.trial is available.""" |
| 385 | + if _get_twisted_version() is not TwistedVersion.NotInstalled: |
| 386 | + from twisted.trial.itrial import IReporter |
| 387 | + from zope.interface import classImplements |
| 388 | + |
| 389 | + classImplements(TestCaseFunction, IReporter) |
| 390 | + |
| 391 | + |
| 392 | +class TwistedVersion(Enum): |
| 393 | + """ |
| 394 | + The Twisted version installed in the environment. |
| 395 | +
|
| 396 | + We have different workarounds in place for different versions of Twisted. |
| 397 | + """ |
| 398 | + |
| 399 | + # Twisted version 24 or prior. |
| 400 | + Version24 = auto() |
| 401 | + # Twisted version 25 or later. |
| 402 | + Version25 = auto() |
| 403 | + # Twisted version is not available. |
| 404 | + NotInstalled = auto() |
| 405 | + |
| 406 | + |
| 407 | +def _get_twisted_version() -> TwistedVersion: |
| 408 | + # We need to check if "twisted.trial.unittest" is specifically present in sys.modules. |
| 409 | + # This is because we intend to integrate with Trial only when it's actively running |
| 410 | + # the test suite, but not needed when only other Twisted components are in use. |
| 411 | + if "twisted.trial.unittest" not in sys.modules: |
| 412 | + return TwistedVersion.NotInstalled |
| 413 | + |
| 414 | + import importlib.metadata |
| 415 | + |
| 416 | + import packaging.version |
| 417 | + |
| 418 | + version_str = importlib.metadata.version("twisted") |
| 419 | + version = packaging.version.parse(version_str) |
| 420 | + if version.major <= 24: |
| 421 | + return TwistedVersion.Version24 |
| 422 | + else: |
| 423 | + return TwistedVersion.Version25 |
| 424 | + |
| 425 | + |
| 426 | +# Name of the attribute in `twisted.python.Failure` instances that stores |
| 427 | +# the `sys.exc_info()` tuple. |
| 428 | +# See twisted.trial support in `pytest_runtest_protocol`. |
| 429 | +TWISTED_RAW_EXCINFO_ATTR = "_twisted_raw_excinfo" |
378 | 430 |
|
379 | 431 |
|
380 | 432 | @hookimpl(wrapper=True)
|
381 |
| -def pytest_runtest_protocol(item: Item) -> Generator[None, object, object]: |
382 |
| - if isinstance(item, TestCaseFunction) and "twisted.trial.unittest" in sys.modules: |
383 |
| - ut: Any = sys.modules["twisted.python.failure"] |
384 |
| - global classImplements_has_run |
385 |
| - Failure__init__ = ut.Failure.__init__ |
386 |
| - if not classImplements_has_run: |
387 |
| - from twisted.trial.itrial import IReporter |
388 |
| - from zope.interface import classImplements |
389 |
| - |
390 |
| - classImplements(TestCaseFunction, IReporter) |
391 |
| - classImplements_has_run = True |
392 |
| - |
393 |
| - def excstore( |
| 433 | +def pytest_runtest_protocol(item: Item) -> Iterator[None]: |
| 434 | + if _get_twisted_version() is TwistedVersion.Version24: |
| 435 | + import twisted.python.failure as ut |
| 436 | + |
| 437 | + # Monkeypatch `Failure.__init__` to store the raw exception info. |
| 438 | + original__init__ = ut.Failure.__init__ |
| 439 | + |
| 440 | + def store_raw_exception_info( |
394 | 441 | self, exc_value=None, exc_type=None, exc_tb=None, captureVars=None
|
395 |
| - ): |
| 442 | + ): # pragma: no cover |
396 | 443 | if exc_value is None:
|
397 |
| - self._rawexcinfo = sys.exc_info() |
| 444 | + raw_exc_info = sys.exc_info() |
398 | 445 | else:
|
399 | 446 | if exc_type is None:
|
400 | 447 | exc_type = type(exc_value)
|
401 |
| - self._rawexcinfo = (exc_type, exc_value, exc_tb) |
| 448 | + if exc_tb is None: |
| 449 | + exc_tb = sys.exc_info()[2] |
| 450 | + raw_exc_info = (exc_type, exc_value, exc_tb) |
| 451 | + setattr(self, TWISTED_RAW_EXCINFO_ATTR, tuple(raw_exc_info)) |
402 | 452 | try:
|
403 |
| - Failure__init__( |
| 453 | + original__init__( |
404 | 454 | self, exc_value, exc_type, exc_tb, captureVars=captureVars
|
405 | 455 | )
|
406 |
| - except TypeError: |
407 |
| - Failure__init__(self, exc_value, exc_type, exc_tb) |
| 456 | + except TypeError: # pragma: no cover |
| 457 | + original__init__(self, exc_value, exc_type, exc_tb) |
408 | 458 |
|
409 |
| - ut.Failure.__init__ = excstore |
410 |
| - try: |
411 |
| - res = yield |
412 |
| - finally: |
413 |
| - ut.Failure.__init__ = Failure__init__ |
| 459 | + with MonkeyPatch.context() as patcher: |
| 460 | + patcher.setattr(ut.Failure, "__init__", store_raw_exception_info) |
| 461 | + return (yield) |
414 | 462 | else:
|
415 |
| - res = yield |
416 |
| - return res |
417 |
| - |
418 |
| - |
419 |
| -def _is_skipped(obj) -> bool: |
420 |
| - """Return True if the given object has been marked with @unittest.skip.""" |
421 |
| - return bool(getattr(obj, "__unittest_skip__", False)) |
| 463 | + return (yield) |
| 464 | + |
| 465 | + |
| 466 | +def _handle_twisted_exc_info( |
| 467 | + rawexcinfo: _SysExcInfoType | BaseException, |
| 468 | +) -> _SysExcInfoType: |
| 469 | + """ |
| 470 | + Twisted passes a custom Failure instance to `addError()` instead of using `sys.exc_info()`. |
| 471 | + Therefore, if `rawexcinfo` is a `Failure` instance, convert it into the equivalent `sys.exc_info()` tuple |
| 472 | + as expected by pytest. |
| 473 | + """ |
| 474 | + twisted_version = _get_twisted_version() |
| 475 | + if twisted_version is TwistedVersion.NotInstalled: |
| 476 | + # Unfortunately, because we cannot import `twisted.python.failure` at the top of the file |
| 477 | + # and use it in the signature, we need to use `type:ignore` here because we cannot narrow |
| 478 | + # the type properly in the `if` statement above. |
| 479 | + return rawexcinfo # type:ignore[return-value] |
| 480 | + elif twisted_version is TwistedVersion.Version24: |
| 481 | + # Twisted calls addError() passing its own classes (like `twisted.python.Failure`), which violates |
| 482 | + # the `addError()` signature, so we extract the original `sys.exc_info()` tuple which is stored |
| 483 | + # in the object. |
| 484 | + if hasattr(rawexcinfo, TWISTED_RAW_EXCINFO_ATTR): |
| 485 | + saved_exc_info = getattr(rawexcinfo, TWISTED_RAW_EXCINFO_ATTR) |
| 486 | + # Delete the attribute from the original object to avoid leaks. |
| 487 | + delattr(rawexcinfo, TWISTED_RAW_EXCINFO_ATTR) |
| 488 | + return saved_exc_info # type:ignore[no-any-return] |
| 489 | + return rawexcinfo # type:ignore[return-value] |
| 490 | + elif twisted_version is TwistedVersion.Version25: |
| 491 | + if isinstance(rawexcinfo, BaseException): |
| 492 | + import twisted.python.failure |
| 493 | + |
| 494 | + if isinstance(rawexcinfo, twisted.python.failure.Failure): |
| 495 | + tb = rawexcinfo.__traceback__ |
| 496 | + if tb is None: |
| 497 | + tb = sys.exc_info()[2] |
| 498 | + return type(rawexcinfo.value), rawexcinfo.value, tb |
| 499 | + |
| 500 | + return rawexcinfo # type:ignore[return-value] |
| 501 | + else: |
| 502 | + # Ideally we would use assert_never() here, but it is not available in all Python versions |
| 503 | + # we support, plus we do not require `type_extensions` currently. |
| 504 | + assert False, f"Unexpected Twisted version: {twisted_version}" |
0 commit comments