(bugfix) Make snapshot pruning robust to archived permissions
Repair user permissions inside snapshot trees before deleting them so retention prune and incomplete cleanup can remove directories copied with restrictive rsync archive modes. Add path validation for scheduled/manual snapshot deletes and cover non-traversable nested directories in retention tests.
This commit is contained in:
@@ -103,6 +103,7 @@ def run_sql_retention_apply(
|
||||
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
|
||||
|
||||
path = _snapshot_delete_path(path=Path(snap_path), dirname=dirname)
|
||||
_validate_snapshot_delete_path(host=host, kind=snap_kind, path=path, dirname=dirname)
|
||||
reason = str(item.get("reason") or "outside retention policy")
|
||||
if not path.exists():
|
||||
actions.append(f"skip missing {snap_kind}/{dirname}")
|
||||
@@ -339,14 +340,55 @@ def _validate_incomplete_delete_path(*, host: str, path: Path, dirname: str) ->
|
||||
raise ConfigError(f"Refusing to delete incomplete snapshot outside host backup root: {path}")
|
||||
|
||||
|
||||
def _validate_snapshot_delete_path(*, host: str, kind: str, path: Path, dirname: str) -> None:
|
||||
if kind not in {SnapshotRecord.Kind.SCHEDULED, SnapshotRecord.Kind.MANUAL}:
|
||||
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {kind!r}")
|
||||
path_parts = path.parts
|
||||
if path.name != dirname or kind not in path_parts or host not in path_parts:
|
||||
raise ConfigError(f"Refusing to delete unexpected snapshot path: {path}")
|
||||
kind_index = path_parts.index(kind)
|
||||
if kind_index == 0 or path_parts[kind_index - 1] != host:
|
||||
raise ConfigError(f"Refusing to delete snapshot outside host backup root: {path}")
|
||||
|
||||
|
||||
def _remove_snapshot_tree(path: Path) -> None:
|
||||
_make_directories_user_writable(path)
|
||||
shutil.rmtree(path)
|
||||
_make_snapshot_tree_user_removable(path)
|
||||
shutil.rmtree(path, onexc=_retry_remove_with_user_permissions)
|
||||
|
||||
|
||||
def _make_directories_user_writable(path: Path) -> None:
|
||||
for directory in [path, *[child for child in path.rglob("*") if child.is_dir() and not child.is_symlink()]]:
|
||||
mode = directory.stat().st_mode
|
||||
if mode & stat.S_IWUSR:
|
||||
def _make_snapshot_tree_user_removable(path: Path) -> None:
|
||||
stack = [path]
|
||||
while stack:
|
||||
directory = stack.pop()
|
||||
if directory.is_symlink():
|
||||
continue
|
||||
directory.chmod(mode | stat.S_IWUSR)
|
||||
_make_path_user_removable(directory)
|
||||
try:
|
||||
children = list(directory.iterdir())
|
||||
except OSError:
|
||||
continue
|
||||
for child in children:
|
||||
if child.is_dir() and not child.is_symlink():
|
||||
stack.append(child)
|
||||
|
||||
|
||||
def _retry_remove_with_user_permissions(function: Any, path: str, excinfo: BaseException) -> None:
|
||||
failed_path = Path(path)
|
||||
_make_path_user_removable(failed_path)
|
||||
function(path)
|
||||
|
||||
|
||||
def _make_path_user_removable(path: Path) -> None:
|
||||
try:
|
||||
mode = path.stat().st_mode
|
||||
except OSError:
|
||||
return
|
||||
wanted = stat.S_IRUSR | stat.S_IWUSR
|
||||
if path.is_dir() and not path.is_symlink():
|
||||
wanted |= stat.S_IXUSR
|
||||
if mode & wanted == wanted:
|
||||
return
|
||||
try:
|
||||
path.chmod(mode | wanted)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user