Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/13322.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed an issue where a plugin hook (e.g. ``pytest_sessionfinish`` or ``pytest_terminal_summary``) that called ``CaptureManager.suspend_global_capture(in_=True)`` from a context where ``out`` and ``err`` were already suspended would have its subsequent ``CaptureManager.resume_global_capture()`` unexpectedly re-enable stdout/stderr capture, causing terminal-width and capture-state inconsistencies.
70 changes: 58 additions & 12 deletions src/_pytest/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ def buffer(self) -> BinaryIO:

class CaptureBase(abc.ABC, Generic[AnyStr]):
EMPTY_BUFFER: AnyStr
# Set by stateful subclasses in ``__init__``; left as ``None`` on
# stateless implementations such as ``NoCapture``.
_state: str | None = None

@abc.abstractmethod
def __init__(self, fd: int) -> None:
Expand Down Expand Up @@ -330,6 +333,10 @@ def writeorg(self, data: AnyStr) -> None:
def snap(self) -> AnyStr:
raise NotImplementedError()

def is_started(self) -> bool:
"""Whether actively capturing -- not initialized, suspended, or done."""
return self._state == "started"


patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}

Expand Down Expand Up @@ -627,7 +634,11 @@ class CaptureResult(

class MultiCapture(Generic[AnyStr]):
_state = None
_in_suspended = False
# Snapshot of (out_started, err_started) captured at the moment of the
# first nested ``suspend_capturing(in_=True)`` call. ``None`` when no
# nested suspend is in progress; non-``None`` iff the matching
# ``resume_capturing`` still needs to restore that prior state (#13322).
_pre_suspend_state: tuple[bool, bool] | None = None

def __init__(
self,
Expand All @@ -640,10 +651,13 @@ def __init__(
self.err: CaptureBase[AnyStr] | None = err

def __repr__(self) -> str:
return (
base = (
f"<MultiCapture out={self.out!r} err={self.err!r} in_={self.in_!r} "
f"_state={self._state!r} _in_suspended={self._in_suspended!r}>"
f"_state={self._state!r}"
)
if self._pre_suspend_state is not None:
base += f" _pre_suspend_state={self._pre_suspend_state!r}"
return base + ">"

def start_capturing(self) -> None:
self._state = "started"
Expand All @@ -666,25 +680,57 @@ def pop_outerr_to_orig(self) -> tuple[AnyStr, AnyStr]:
return out, err

def suspend_capturing(self, in_: bool = False) -> None:
"""Suspend out/err (and ``in_`` when ``in_=True``).

When ``in_=True`` and no nested suspend is already in progress, the
prior started/suspended state of out and err is snapshotted so that
the matching :meth:`resume_capturing` restores them to that state
instead of unconditionally restarting them. This supports plugin
hooks that suspend capture once for out/err and then nest a second
``in_=True`` suspend to read user input -- without that snapshot,
the resume would re-enable out/err even though the caller intended
them to remain suspended (#13322).

The contract is that ``suspend_capturing(in_=True)`` and
:meth:`resume_capturing` are called as a matched pair; nesting is
single-depth, not arbitrary.
"""
self._state = "suspended"
if in_ and self.in_ is not None and self._pre_suspend_state is None:
self._pre_suspend_state = (
self.out is not None and self.out.is_started(),
self.err is not None and self.err.is_started(),
)
self.in_.suspend()
if self.out:
self.out.suspend()
if self.err:
self.err.suspend()
if in_ and self.in_:
self.in_.suspend()
self._in_suspended = True

def resume_capturing(self) -> None:
"""Resume out/err (and ``in_`` if a matching nested suspend is active).

If the matching :meth:`suspend_capturing` was called with
``in_=True``, the snapshot taken at that point determines whether
out/err are restored to ``started`` -- streams that were already
suspended before the nested suspend remain suspended (#13322).
Otherwise, out/err are unconditionally resumed (the original
behaviour).
"""
self._state = "started"
if self.out:
self.out.resume()
if self.err:
self.err.resume()
if self._in_suspended:
snap = self._pre_suspend_state
if snap is not None:
assert self.in_ is not None
self.in_.resume()
self._in_suspended = False
out_was_started, err_was_started = snap
self._pre_suspend_state = None
else:
out_was_started = self.out is not None
err_was_started = self.err is not None
if out_was_started and self.out is not None:
self.out.resume()
if err_was_started and self.err is not None:
self.err.resume()

def stop_capturing(self) -> None:
"""Stop capturing and reset capturing streams."""
Expand Down
106 changes: 106 additions & 0 deletions testing/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,67 @@ def test_init_capturing(self):
finally:
capouter.stop_capturing()

@pytest.mark.parametrize("method", ["sys", "fd"])
def test_suspend_in_preserves_out_err_suspended_state(self, method) -> None:
"""suspend_global_capture(in_=True) + resume_global_capture() must
not re-enable out/err capture if they were already suspended (#13322).
"""
capouter = StdCaptureFD()
try:
capman = CaptureManager(method)
capman.start_global_capturing()
mc = capman._global_capturing
assert mc is not None
assert mc.out is not None
assert mc.err is not None
assert mc.in_ is not None

capman.suspend_global_capture(in_=False)
assert not mc.out.is_started()
assert not mc.err.is_started()
assert mc.in_.is_started()

capman.suspend_global_capture(in_=True)
assert not mc.in_.is_started()

capman.resume_global_capture()
assert not mc.out.is_started()
assert not mc.err.is_started()
assert mc.in_.is_started()

capman.stop_global_capturing()
finally:
capouter.stop_capturing()

@pytest.mark.parametrize("method", ["sys", "fd"])
def test_suspend_in_restores_out_err_started_state(self, method) -> None:
"""suspend_global_capture(in_=True) + resume_global_capture() restores
out/err to started when that was their state before the suspend (#13322).
"""
capouter = StdCaptureFD()
try:
capman = CaptureManager(method)
capman.start_global_capturing()
mc = capman._global_capturing
assert mc is not None
assert mc.out is not None
assert mc.err is not None
assert mc.in_ is not None

capman.suspend_global_capture(in_=True)
assert not mc.out.is_started()
assert not mc.err.is_started()
assert not mc.in_.is_started()

capman.resume_global_capture()
assert mc.out.is_started()
assert mc.err.is_started()
assert mc.in_.is_started()

capman.stop_global_capturing()
finally:
capouter.stop_capturing()


@pytest.mark.parametrize("method", ["fd", "sys"])
def test_capturing_unicode(pytester: Pytester, method: str) -> None:
Expand Down Expand Up @@ -1706,6 +1767,51 @@ def test_logging():
result.stdout.no_fnmatch_line("*during collection*")


def test_nested_suspend_in_preserves_outer_suspended_state(
pytester: Pytester,
) -> None:
"""Black-box regression for #13322.

A plugin hook that suspends out/err first and then nests a
``suspend_global_capture(in_=True)`` must observe that the matching
``resume_global_capture()`` leaves out/err in their prior suspended
state, so that prints from the hook after the resume reach the real
terminal *immediately* (i.e. before the summary line).

On the pre-fix implementation, ``MARK_AFTER_RESUME`` would be
re-captured by the buggy resume and only flushed to real stdout at
``stop_global_capturing`` cleanup time, which runs after
``pytest_unconfigure`` and therefore after the ``N passed`` summary
line. Asserting the marker appears *before* the summary line is the
deterministic discriminator.
"""
pytester.makeconftest("""
import pytest


def pytest_terminal_summary(config):
capture = config.pluginmanager.getplugin("capturemanager")
capture.suspend_global_capture(in_=False)
print("MARK_BEFORE_NESTED")
capture.suspend_global_capture(in_=True)
capture.resume_global_capture()
print("MARK_AFTER_RESUME")
""")
pytester.makepyfile("def test_x(): pass")
result = pytester.runpytest_subprocess()
# Both marks must appear in order, AND both must appear before the
# final summary line. On the buggy code MARK_AFTER_RESUME would only
# be flushed at CaptureManager teardown -- i.e. after "1 passed".
result.stdout.fnmatch_lines(
[
"*MARK_BEFORE_NESTED*",
"*MARK_AFTER_RESUME*",
"*1 passed*",
]
)
assert result.ret == 0


def test_libedit_workaround(pytester: Pytester) -> None:
pytester.makeconftest("""
import pytest
Expand Down