Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/7940.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed ``rm_rf`` failing to remove directories when the ``S_IXUSR`` (owner execute) permission bit is missing -- supersedes :pr:`7941`.
67 changes: 53 additions & 14 deletions src/_pytest/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,31 @@ def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
return path.joinpath(".lock")


def _chmod_rwx(p: str) -> bool:
"""Grant owner sufficient permissions for deletion.

Directories get ``S_IRWXU`` (read+write+exec for traversal).
Regular files get ``S_IRUSR | S_IWUSR`` only, to avoid making
non-executable files executable as a side effect.

Returns True if permissions were actually changed, False if they were
already sufficient or couldn't be changed.
"""
import stat

try:
old_mode = os.stat(p).st_mode
perm_mode = stat.S_IMODE(old_mode)
bits = stat.S_IRWXU if stat.S_ISDIR(old_mode) else stat.S_IRUSR | stat.S_IWUSR
new_mode = perm_mode | bits
if perm_mode == new_mode:
return False
os.chmod(p, new_mode)
except OSError:
return False
return True


def on_rm_rf_error(
func: Callable[..., Any] | None,
path: str,
Expand Down Expand Up @@ -98,32 +123,46 @@ def on_rm_rf_error(
)
return False

if func in (os.open, os.scandir):
# Directory traversal failed (e.g. missing S_IXUSR). Fix permissions
# on the path and its parent, then remove it ourselves since rmtree
# skips entries after the error handler returns.
# See: https://github.com/pytest-dev/pytest/issues/7940
p = Path(path)
parent_changed = False
if p.parent != p:
parent_changed = _chmod_rwx(str(p.parent))
path_changed = _chmod_rwx(path)
if not (parent_changed or path_changed):
return False
if os.path.isdir(path):
rm_rf(Path(path))
else:
try:
os.unlink(path)
except OSError:
return False
return True

if func not in (os.rmdir, os.remove, os.unlink):
if func not in (os.open,):
warnings.warn(
PytestWarning(
f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}"
)
warnings.warn(
PytestWarning(
f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}"
)
)
return False

# Chmod + retry.
import stat

def chmod_rw(p: str) -> None:
mode = os.stat(p).st_mode
os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)

# For files, we need to recursively go upwards in the directories to
# ensure they all are also writable.
# ensure they all are also accessible and writable.
p = Path(path)
if p.is_file():
for parent in p.parents:
chmod_rw(str(parent))
_chmod_rwx(str(parent))
# Stop when we reach the original path passed to rm_rf.
if parent == start_path:
break
chmod_rw(str(path))
_chmod_rwx(str(path))

func(path)
return True
Expand Down
78 changes: 71 additions & 7 deletions testing/test_tmpdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,25 @@ def test_rm_rf_with_read_only_directory(self, tmp_path):

assert not adir.is_dir()

@pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions")
def test_rm_rf_with_no_exec_permission_directories(self, tmp_path):
"""Ensure rm_rf can remove directories without S_IXUSR (#7940).

This is the exact scenario from the original issue: nested directories
and files with all permissions stripped.
"""
p = tmp_path / "foo" / "bar" / "baz"
p.parent.mkdir(parents=True)
p.touch(mode=0)
for parent in p.parents:
if parent == tmp_path:
break
parent.chmod(mode=0)

rm_rf(tmp_path / "foo")

assert not (tmp_path / "foo").exists()

def test_on_rm_rf_error(self, tmp_path: Path) -> None:
adir = tmp_path / "dir"
adir.mkdir()
Expand Down Expand Up @@ -527,17 +546,62 @@ def test_on_rm_rf_error(self, tmp_path: Path) -> None:
on_rm_rf_error(None, str(fn), exc_info3, start_path=tmp_path)
assert fn.is_file()

# ignored function
with warnings.catch_warnings(record=True) as w:
exc_info4 = PermissionError()
on_rm_rf_error(os.open, str(fn), exc_info4, start_path=tmp_path)
assert fn.is_file()
assert not [x.message for x in w]

# os.unlink PermissionError is handled (chmod + retry)
exc_info5 = PermissionError()
on_rm_rf_error(os.unlink, str(fn), exc_info5, start_path=tmp_path)
assert not fn.is_file()

def test_on_rm_rf_error_os_open_handles_file(self, tmp_path: Path) -> None:
"""os.open PermissionError on a file is handled by fixing
permissions and removing it (#7940)."""
adir = tmp_path / "dir"
adir.mkdir()
fn = adir / "foo.txt"
fn.touch()
self.chmod_r(fn)

with warnings.catch_warnings(record=True) as w:
exc_info = PermissionError()
on_rm_rf_error(os.open, str(fn), exc_info, start_path=tmp_path)
assert not fn.exists()
assert not [x.message for x in w]

@pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions")
def test_on_rm_rf_error_os_open_handles_directory(self, tmp_path: Path) -> None:
"""os.open PermissionError on a directory is handled by fixing
permissions and recursively removing it (#7940)."""
adir = tmp_path / "dir"
adir.mkdir()
(adir / "child").mkdir()
(adir / "child" / "file.txt").touch()
os.chmod(str(adir), 0o600)

with warnings.catch_warnings(record=True) as w:
exc_info = PermissionError()
on_rm_rf_error(os.open, str(adir), exc_info, start_path=tmp_path)
assert not adir.exists()
assert not [x.message for x in w]

@pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions")
def test_on_rm_rf_error_os_open_parent_perms(self, tmp_path: Path) -> None:
"""When the PermissionError is caused by the *parent* directory lacking
S_IXUSR, fixing the parent is sufficient even if the child already has
correct permissions."""
parent = tmp_path / "parent"
parent.mkdir()
child = parent / "child"
child.mkdir()
(child / "file.txt").touch()
# Child has full perms, but parent lacks execute -> os.open(child) fails.
os.chmod(str(parent), 0o600)

with warnings.catch_warnings(record=True) as w:
exc_info = PermissionError()
result = on_rm_rf_error(os.open, str(child), exc_info, start_path=tmp_path)
assert result is True
assert not child.exists()
assert not [x.message for x in w]


def attempt_symlink_to(path, to_path):
"""Try to make a symlink from "path" to "to_path", skipping in case this platform
Expand Down