(bugfix) Prune snapshots with preserved read-only permissions
Make SQL retention delete the snapshot root when records point at the snapshot data directory, matching how backup metadata is stored on disk. Before removing a snapshot tree, temporarily add user write permission to directories inside that snapshot so rsync-preserved source permissions do not block cleanup. Add a regression test for pruning snapshots whose data directory mirrors a read-only remote root.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
import stat
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -89,17 +90,17 @@ def run_sql_retention_apply(
|
||||
if snap_kind not in {"scheduled", "manual"}:
|
||||
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
|
||||
|
||||
path = Path(snap_path)
|
||||
path = _snapshot_delete_path(path=Path(snap_path), dirname=dirname)
|
||||
if not path.exists():
|
||||
actions.append(f"skip missing {snap_kind}/{dirname}")
|
||||
continue
|
||||
if not path.is_dir():
|
||||
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
|
||||
raise ConfigError(f"Refusing to delete non-directory path: {path}")
|
||||
|
||||
shutil.rmtree(path)
|
||||
_remove_snapshot_tree(path)
|
||||
SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete()
|
||||
actions.append(f"deleted {snap_kind} {dirname}")
|
||||
deleted.append({"dirname": dirname, "kind": snap_kind, "path": snap_path})
|
||||
deleted.append({"dirname": dirname, "kind": snap_kind, "path": str(path)})
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
@@ -180,3 +181,22 @@ def _snapshot_to_delete_item(snapshot: Snapshot) -> dict[str, Any]:
|
||||
"dt": snapshot.dt.isoformat(),
|
||||
"status": snapshot.status,
|
||||
}
|
||||
|
||||
|
||||
def _snapshot_delete_path(*, path: Path, dirname: str) -> Path:
|
||||
if path.name == "data" and path.parent.name == dirname:
|
||||
return path.parent
|
||||
return path
|
||||
|
||||
|
||||
def _remove_snapshot_tree(path: Path) -> None:
|
||||
_make_directories_user_writable(path)
|
||||
shutil.rmtree(path)
|
||||
|
||||
|
||||
def _make_directories_user_writable(path: Path) -> None:
|
||||
for directory in [path, *[child for child in path.rglob("*") if child.is_dir() and not child.is_symlink()]]:
|
||||
mode = directory.stat().st_mode
|
||||
if mode & stat.S_IWUSR:
|
||||
continue
|
||||
directory.chmod(mode | stat.S_IWUSR)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import stat
|
||||
from datetime import datetime, timezone
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
@@ -85,6 +86,42 @@ class SqlRetentionTests(TestCase):
|
||||
self.assertFalse(SnapshotRecord.objects.filter(pk=old.pk).exists())
|
||||
self.assertEqual(result["deleted"], [{"dirname": old.dirname, "kind": "scheduled", "path": str(old_dir)}])
|
||||
|
||||
def test_apply_deletes_snapshot_with_readonly_data_directory(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
prefix = Path(tmp) / "home"
|
||||
host = HostConfig.objects.create(
|
||||
host="web-01",
|
||||
address="web-01.example.test",
|
||||
retention_daily=0,
|
||||
retention_weekly=0,
|
||||
retention_monthly=0,
|
||||
retention_yearly=0,
|
||||
)
|
||||
old_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260518-021500Z__OLD"
|
||||
old_data = old_dir / "data"
|
||||
old_data.mkdir(parents=True)
|
||||
old_data.joinpath("etc").mkdir()
|
||||
old_data.joinpath("etc", "config").write_text("preserved permissions\n")
|
||||
old_data.chmod(stat.S_IREAD | stat.S_IEXEC | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
|
||||
new_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260519-021500Z__NEW"
|
||||
new_dir.mkdir(parents=True)
|
||||
old = self._snapshot(host, old_dir.name, path=str(old_data))
|
||||
self._snapshot(host, new_dir.name, path=str(new_dir))
|
||||
|
||||
result = run_sql_retention_apply(
|
||||
prefix=prefix,
|
||||
host=host.host,
|
||||
kind="scheduled",
|
||||
protect_bases=False,
|
||||
yes=True,
|
||||
max_delete=1,
|
||||
acquire_lock=False,
|
||||
)
|
||||
|
||||
self.assertFalse(old_dir.exists())
|
||||
self.assertFalse(SnapshotRecord.objects.filter(pk=old.pk).exists())
|
||||
self.assertEqual(result["deleted"], [{"dirname": old.dirname, "kind": "scheduled", "path": str(old_dir)}])
|
||||
|
||||
def test_apply_respects_max_delete(self) -> None:
|
||||
host = HostConfig.objects.create(
|
||||
host="web-01",
|
||||
|
||||
Reference in New Issue
Block a user