Files
pobsync/src/pobsync_backend/retention.py
Peter van Arkel ea9e3e41e3 (release) Add purged snapshot audit overview
Record snapshot purge history whenever retention or incomplete cleanup removes
snapshot directories and SQL records. Store the purge reason, original kind,
path, action source, and triggering operator so manual, scheduled, CLI, and
incomplete cleanup actions remain auditable after the original snapshot record
is deleted.

Add a staff-only Purged Snapshots page with host/action filters and register
the audit model in Django admin.

Refs #16
Refs #8
2026-05-21 03:46:38 +02:00

353 lines
12 KiB
Python

from __future__ import annotations
import shutil
import stat
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from pobsync.errors import ConfigError
from pobsync.lock import acquire_host_lock
from pobsync.paths import PobsyncPaths
from pobsync.retention import Snapshot, apply_base_protection, build_retention_plan
from pobsync.util import sanitize_host
from .models import HostConfig, PurgedSnapshot, SnapshotRecord
def run_sql_retention_plan(*, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
host_config = _enabled_host_config(host)
retention = _retention_for_host(host_config)
snapshots = _snapshots_for_retention(host_config=host_config, kind=kind)
incomplete_snapshots = _incomplete_snapshots_for_host(host_config)
plan = build_retention_plan(
snapshots=snapshots,
retention=retention,
now=datetime.now(timezone.utc),
)
keep = set(plan.keep)
reasons = dict(plan.reasons)
if protect_bases:
keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [snapshot for snapshot in snapshots if snapshot.dirname not in keep]
keep_items = [snapshot for snapshot in snapshots if snapshot.dirname in keep]
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"retention": retention,
"source": "sql",
"keep": sorted(keep),
"keep_items": [_snapshot_to_item(snapshot, reasons=reasons.get(snapshot.dirname, [])) for snapshot in keep_items],
"delete": [_snapshot_to_item(snapshot, reasons=["outside retention policy"]) for snapshot in delete],
"incomplete": [
_snapshot_to_item(snapshot, reasons=["incomplete snapshot; excluded from retention cleanup"])
for snapshot in incomplete_snapshots
],
"reasons": reasons,
}
def run_sql_retention_apply(
*,
prefix: Path,
host: str,
kind: str,
protect_bases: bool,
yes: bool,
max_delete: int,
action: str = PurgedSnapshot.Action.MANUAL,
triggered_by: str = "",
acquire_lock: bool = True,
) -> dict[str, Any]:
host = sanitize_host(host)
if not yes:
raise ConfigError("Refusing to delete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
def _do_apply() -> dict[str, Any]:
plan = run_sql_retention_plan(host=host, kind=kind, protect_bases=bool(protect_bases))
delete_list = plan.get("delete") or []
incomplete_list = plan.get("incomplete") or []
if not isinstance(delete_list, list):
raise ConfigError("Invalid retention plan output: delete is not a list")
if not isinstance(incomplete_list, list):
raise ConfigError("Invalid retention plan output: incomplete is not a list")
if max_delete == 0 and len(delete_list) > 0:
raise ConfigError("Deletion blocked by --max-delete=0")
if len(delete_list) > max_delete:
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
actions: list[str] = []
deleted: list[dict[str, Any]] = []
for item in delete_list:
dirname = item.get("dirname") if isinstance(item, dict) else None
snap_kind = item.get("kind") if isinstance(item, dict) else None
snap_path = item.get("path") if isinstance(item, dict) else None
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
continue
if snap_kind not in {"scheduled", "manual"}:
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
path = _snapshot_delete_path(path=Path(snap_path), dirname=dirname)
reason = str(item.get("reason") or "outside retention policy")
if not path.exists():
actions.append(f"skip missing {snap_kind}/{dirname}")
continue
if not path.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {path}")
_remove_snapshot_tree(path)
_record_purged_snapshot(
host_config=_enabled_host_config(host),
kind=snap_kind,
dirname=dirname,
path=path,
reason=reason,
action=action,
triggered_by=triggered_by,
metadata={"source": "retention", "protect_bases": bool(protect_bases), "retention_kind": kind},
)
SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete()
actions.append(f"deleted {snap_kind} {dirname}")
deleted.append({"dirname": dirname, "kind": snap_kind, "path": str(path), "reason": reason})
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"max_delete": max_delete,
"source": "sql",
"planned_delete_count": len(delete_list),
"incomplete_ignored_count": len(incomplete_list),
"deleted": deleted,
"actions": actions,
}
if acquire_lock:
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
return _do_apply()
return _do_apply()
def run_incomplete_cleanup(
*,
prefix: Path,
host: str,
yes: bool,
max_delete: int,
triggered_by: str = "",
acquire_lock: bool = True,
) -> dict[str, Any]:
host = sanitize_host(host)
if not yes:
raise ConfigError("Refusing to delete incomplete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
def _do_cleanup() -> dict[str, Any]:
host_config = _enabled_host_config(host)
incomplete_list = [
_snapshot_to_item(snapshot, reasons=["manual incomplete cleanup"])
for snapshot in _incomplete_snapshots_for_host(host_config)
]
if max_delete == 0 and len(incomplete_list) > 0:
raise ConfigError("Incomplete cleanup blocked by --max-delete=0")
if len(incomplete_list) > max_delete:
raise ConfigError(
f"Refusing to delete {len(incomplete_list)} incomplete snapshots (exceeds --max-delete={max_delete})"
)
actions: list[str] = []
deleted: list[dict[str, Any]] = []
for item in incomplete_list:
dirname = item["dirname"]
snap_path = Path(item["path"])
path = _snapshot_delete_path(path=snap_path, dirname=dirname)
_validate_incomplete_delete_path(host=host, path=path, dirname=dirname)
if not path.exists():
actions.append(f"skip missing incomplete/{dirname}")
elif not path.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {path}")
else:
_remove_snapshot_tree(path)
actions.append(f"deleted incomplete {dirname}")
_record_purged_snapshot(
host_config=host_config,
kind=SnapshotRecord.Kind.INCOMPLETE,
dirname=dirname,
path=path,
reason="manual incomplete cleanup",
action=PurgedSnapshot.Action.INCOMPLETE_CLEANUP,
triggered_by=triggered_by,
metadata={"source": "incomplete_cleanup"},
)
SnapshotRecord.objects.filter(
host__host=host,
kind=SnapshotRecord.Kind.INCOMPLETE,
dirname=dirname,
).delete()
deleted.append({"dirname": dirname, "kind": SnapshotRecord.Kind.INCOMPLETE, "path": str(path)})
return {
"ok": True,
"host": host,
"kind": SnapshotRecord.Kind.INCOMPLETE,
"max_delete": max_delete,
"source": "sql",
"planned_delete_count": len(incomplete_list),
"deleted": deleted,
"actions": actions,
}
if acquire_lock:
with acquire_host_lock(paths.locks_dir, host, command="incomplete-cleanup"):
return _do_cleanup()
return _do_cleanup()
def _enabled_host_config(host: str) -> HostConfig:
try:
return HostConfig.objects.get(host=host, enabled=True)
except HostConfig.DoesNotExist as exc:
raise ConfigError(f"Missing enabled host {host!r}") from exc
def _retention_for_host(host_config: HostConfig) -> dict[str, int]:
return {
"daily": host_config.retention_daily,
"weekly": host_config.retention_weekly,
"monthly": host_config.retention_monthly,
"yearly": host_config.retention_yearly,
}
def _snapshots_for_retention(*, host_config: HostConfig, kind: str) -> list[Snapshot]:
kinds = ["scheduled", "manual"] if kind == "all" else [kind]
records = (
SnapshotRecord.objects.filter(host=host_config, kind__in=kinds)
.exclude(kind=SnapshotRecord.Kind.INCOMPLETE)
.select_related("base")
.order_by("-started_at", "dirname")
)
return [_snapshot_from_record(record) for record in records]
def _incomplete_snapshots_for_host(host_config: HostConfig) -> list[Snapshot]:
records = (
SnapshotRecord.objects.filter(host=host_config, kind=SnapshotRecord.Kind.INCOMPLETE)
.select_related("base")
.order_by("-started_at", "dirname")
)
return [_snapshot_from_record(record) for record in records]
def _snapshot_from_record(record: SnapshotRecord) -> Snapshot:
return Snapshot(
kind=record.kind,
dirname=record.dirname,
path=record.path,
dt=record.started_at or datetime.fromtimestamp(0, tz=timezone.utc),
status=record.status or None,
base=_base_meta_from_record(record),
)
def _base_meta_from_record(record: SnapshotRecord) -> dict[str, str] | None:
if record.base is not None:
return {
"kind": record.base.kind,
"dirname": record.base.dirname,
"path": record.base.path,
}
if record.base_kind and record.base_dirname:
return {
"kind": record.base_kind,
"dirname": record.base_dirname,
"path": record.base_path,
}
return None
def _snapshot_to_item(snapshot: Snapshot, *, reasons: list[str]) -> dict[str, Any]:
return {
"dirname": snapshot.dirname,
"kind": snapshot.kind,
"path": snapshot.path,
"dt": snapshot.dt.isoformat(),
"status": snapshot.status,
"reasons": reasons,
"reason": ", ".join(reasons),
}
def _snapshot_delete_path(*, path: Path, dirname: str) -> Path:
if path.name == "data" and path.parent.name == dirname:
return path.parent
return path
def _record_purged_snapshot(
*,
host_config: HostConfig,
kind: str,
dirname: str,
path: Path,
reason: str,
action: str,
triggered_by: str,
metadata: dict[str, Any],
) -> None:
PurgedSnapshot.objects.create(
host=host_config,
host_name=host_config.host,
kind=kind,
dirname=dirname,
path=str(path),
reason=reason,
action=action,
triggered_by=triggered_by,
metadata=metadata,
)
def _validate_incomplete_delete_path(*, host: str, path: Path, dirname: str) -> None:
path_parts = path.parts
if path.name != dirname or ".incomplete" not in path_parts or host not in path_parts:
raise ConfigError(f"Refusing to delete unexpected incomplete snapshot path: {path}")
incomplete_index = path_parts.index(".incomplete")
if incomplete_index == 0 or path_parts[incomplete_index - 1] != host:
raise ConfigError(f"Refusing to delete incomplete snapshot outside host backup root: {path}")
def _remove_snapshot_tree(path: Path) -> None:
_make_directories_user_writable(path)
shutil.rmtree(path)
def _make_directories_user_writable(path: Path) -> None:
for directory in [path, *[child for child in path.rglob("*") if child.is_dir() and not child.is_symlink()]]:
mode = directory.stat().st_mode
if mode & stat.S_IWUSR:
continue
directory.chmod(mode | stat.S_IWUSR)