Require incomplete snapshots to be marked reviewed before the cleanup action can delete them, and show review state in the retention plan UI. Keep cleanup confirmation counts scoped to reviewed incomplete snapshots and add coverage for blocked, reviewed, and deletion flows.
432 lines
15 KiB
Python
432 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import shutil
|
|
import stat
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from pobsync.errors import ConfigError
|
|
from pobsync.lock import acquire_host_lock
|
|
from pobsync.paths import PobsyncPaths
|
|
from pobsync.retention import Snapshot, apply_base_protection, build_retention_plan
|
|
from pobsync.util import sanitize_host
|
|
|
|
from .models import HostConfig, PurgedSnapshot, SnapshotRecord
|
|
|
|
|
|
def run_sql_retention_plan(*, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
|
|
host = sanitize_host(host)
|
|
if kind not in {"scheduled", "manual", "all"}:
|
|
raise ConfigError("kind must be scheduled, manual, or all")
|
|
|
|
host_config = _enabled_host_config(host)
|
|
retention = _retention_for_host(host_config)
|
|
snapshots = _snapshots_for_retention(host_config=host_config, kind=kind)
|
|
incomplete_items = _incomplete_snapshot_items_for_host(host_config)
|
|
|
|
plan = build_retention_plan(
|
|
snapshots=snapshots,
|
|
retention=retention,
|
|
now=datetime.now(timezone.utc),
|
|
)
|
|
|
|
keep = set(plan.keep)
|
|
reasons = dict(plan.reasons)
|
|
if protect_bases:
|
|
keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
|
|
|
|
delete = [snapshot for snapshot in snapshots if snapshot.dirname not in keep]
|
|
keep_items = [snapshot for snapshot in snapshots if snapshot.dirname in keep]
|
|
|
|
return {
|
|
"ok": True,
|
|
"host": host,
|
|
"kind": kind,
|
|
"protect_bases": bool(protect_bases),
|
|
"retention": retention,
|
|
"source": "sql",
|
|
"keep": sorted(keep),
|
|
"keep_items": [_snapshot_to_item(snapshot, reasons=reasons.get(snapshot.dirname, [])) for snapshot in keep_items],
|
|
"delete": [_snapshot_to_item(snapshot, reasons=["outside retention policy"]) for snapshot in delete],
|
|
"incomplete": incomplete_items,
|
|
"incomplete_reviewed_count": sum(1 for item in incomplete_items if item["reviewed"]),
|
|
"incomplete_unreviewed_count": sum(1 for item in incomplete_items if not item["reviewed"]),
|
|
"reasons": reasons,
|
|
}
|
|
|
|
|
|
def run_sql_retention_apply(
|
|
*,
|
|
prefix: Path,
|
|
host: str,
|
|
kind: str,
|
|
protect_bases: bool,
|
|
yes: bool,
|
|
max_delete: int,
|
|
action: str = PurgedSnapshot.Action.MANUAL,
|
|
triggered_by: str = "",
|
|
acquire_lock: bool = True,
|
|
) -> dict[str, Any]:
|
|
host = sanitize_host(host)
|
|
if not yes:
|
|
raise ConfigError("Refusing to delete snapshots without --yes")
|
|
if max_delete < 0:
|
|
raise ConfigError("--max-delete must be >= 0")
|
|
|
|
paths = PobsyncPaths(home=prefix)
|
|
|
|
def _do_apply() -> dict[str, Any]:
|
|
plan = run_sql_retention_plan(host=host, kind=kind, protect_bases=bool(protect_bases))
|
|
delete_list = plan.get("delete") or []
|
|
incomplete_list = plan.get("incomplete") or []
|
|
if not isinstance(delete_list, list):
|
|
raise ConfigError("Invalid retention plan output: delete is not a list")
|
|
if not isinstance(incomplete_list, list):
|
|
raise ConfigError("Invalid retention plan output: incomplete is not a list")
|
|
if max_delete == 0 and len(delete_list) > 0:
|
|
raise ConfigError("Deletion blocked by --max-delete=0")
|
|
if len(delete_list) > max_delete:
|
|
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
|
|
|
|
actions: list[str] = []
|
|
deleted: list[dict[str, Any]] = []
|
|
|
|
for item in delete_list:
|
|
dirname = item.get("dirname") if isinstance(item, dict) else None
|
|
snap_kind = item.get("kind") if isinstance(item, dict) else None
|
|
snap_path = item.get("path") if isinstance(item, dict) else None
|
|
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
|
|
continue
|
|
if snap_kind not in {"scheduled", "manual"}:
|
|
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
|
|
|
|
path = _snapshot_delete_path(path=Path(snap_path), dirname=dirname)
|
|
_validate_snapshot_delete_path(host=host, kind=snap_kind, path=path, dirname=dirname)
|
|
reason = str(item.get("reason") or "outside retention policy")
|
|
if not path.exists():
|
|
actions.append(f"skip missing {snap_kind}/{dirname}")
|
|
continue
|
|
if not path.is_dir():
|
|
raise ConfigError(f"Refusing to delete non-directory path: {path}")
|
|
|
|
_remove_snapshot_tree(path)
|
|
_record_purged_snapshot(
|
|
host_config=_enabled_host_config(host),
|
|
kind=snap_kind,
|
|
dirname=dirname,
|
|
path=path,
|
|
reason=reason,
|
|
action=action,
|
|
triggered_by=triggered_by,
|
|
metadata={"source": "retention", "protect_bases": bool(protect_bases), "retention_kind": kind},
|
|
)
|
|
SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete()
|
|
actions.append(f"deleted {snap_kind} {dirname}")
|
|
deleted.append({"dirname": dirname, "kind": snap_kind, "path": str(path), "reason": reason})
|
|
|
|
return {
|
|
"ok": True,
|
|
"host": host,
|
|
"kind": kind,
|
|
"protect_bases": bool(protect_bases),
|
|
"max_delete": max_delete,
|
|
"source": "sql",
|
|
"planned_delete_count": len(delete_list),
|
|
"incomplete_ignored_count": len(incomplete_list),
|
|
"deleted": deleted,
|
|
"actions": actions,
|
|
}
|
|
|
|
if acquire_lock:
|
|
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
|
|
return _do_apply()
|
|
return _do_apply()
|
|
|
|
|
|
def run_incomplete_cleanup(
|
|
*,
|
|
prefix: Path,
|
|
host: str,
|
|
yes: bool,
|
|
max_delete: int,
|
|
triggered_by: str = "",
|
|
acquire_lock: bool = True,
|
|
) -> dict[str, Any]:
|
|
host = sanitize_host(host)
|
|
if not yes:
|
|
raise ConfigError("Refusing to delete incomplete snapshots without --yes")
|
|
if max_delete < 0:
|
|
raise ConfigError("--max-delete must be >= 0")
|
|
|
|
paths = PobsyncPaths(home=prefix)
|
|
|
|
def _do_cleanup() -> dict[str, Any]:
|
|
host_config = _enabled_host_config(host)
|
|
unreviewed_count = _unreviewed_incomplete_count(host_config)
|
|
if unreviewed_count:
|
|
raise ConfigError(
|
|
f"Refusing to delete {unreviewed_count} incomplete snapshot(s) that have not been reviewed."
|
|
)
|
|
|
|
incomplete_list = [
|
|
_snapshot_to_item(snapshot, reasons=["manual incomplete cleanup"])
|
|
for snapshot in _reviewed_incomplete_snapshots_for_host(host_config)
|
|
]
|
|
if max_delete == 0 and len(incomplete_list) > 0:
|
|
raise ConfigError("Incomplete cleanup blocked by --max-delete=0")
|
|
if len(incomplete_list) > max_delete:
|
|
raise ConfigError(
|
|
f"Refusing to delete {len(incomplete_list)} incomplete snapshots (exceeds --max-delete={max_delete})"
|
|
)
|
|
|
|
actions: list[str] = []
|
|
deleted: list[dict[str, Any]] = []
|
|
|
|
for item in incomplete_list:
|
|
dirname = item["dirname"]
|
|
snap_path = Path(item["path"])
|
|
path = _snapshot_delete_path(path=snap_path, dirname=dirname)
|
|
_validate_incomplete_delete_path(host=host, path=path, dirname=dirname)
|
|
|
|
if not path.exists():
|
|
actions.append(f"skip missing incomplete/{dirname}")
|
|
elif not path.is_dir():
|
|
raise ConfigError(f"Refusing to delete non-directory path: {path}")
|
|
else:
|
|
_remove_snapshot_tree(path)
|
|
actions.append(f"deleted incomplete {dirname}")
|
|
|
|
_record_purged_snapshot(
|
|
host_config=host_config,
|
|
kind=SnapshotRecord.Kind.INCOMPLETE,
|
|
dirname=dirname,
|
|
path=path,
|
|
reason="manual incomplete cleanup",
|
|
action=PurgedSnapshot.Action.INCOMPLETE_CLEANUP,
|
|
triggered_by=triggered_by,
|
|
metadata={"source": "incomplete_cleanup"},
|
|
)
|
|
SnapshotRecord.objects.filter(
|
|
host__host=host,
|
|
kind=SnapshotRecord.Kind.INCOMPLETE,
|
|
dirname=dirname,
|
|
).delete()
|
|
deleted.append({"dirname": dirname, "kind": SnapshotRecord.Kind.INCOMPLETE, "path": str(path)})
|
|
|
|
return {
|
|
"ok": True,
|
|
"host": host,
|
|
"kind": SnapshotRecord.Kind.INCOMPLETE,
|
|
"max_delete": max_delete,
|
|
"source": "sql",
|
|
"planned_delete_count": len(incomplete_list),
|
|
"deleted": deleted,
|
|
"actions": actions,
|
|
}
|
|
|
|
if acquire_lock:
|
|
with acquire_host_lock(paths.locks_dir, host, command="incomplete-cleanup"):
|
|
return _do_cleanup()
|
|
return _do_cleanup()
|
|
|
|
|
|
def _enabled_host_config(host: str) -> HostConfig:
|
|
try:
|
|
return HostConfig.objects.get(host=host, enabled=True)
|
|
except HostConfig.DoesNotExist as exc:
|
|
raise ConfigError(f"Missing enabled host {host!r}") from exc
|
|
|
|
|
|
def _retention_for_host(host_config: HostConfig) -> dict[str, int]:
|
|
return {
|
|
"daily": host_config.retention_daily,
|
|
"weekly": host_config.retention_weekly,
|
|
"monthly": host_config.retention_monthly,
|
|
"yearly": host_config.retention_yearly,
|
|
}
|
|
|
|
|
|
def _snapshots_for_retention(*, host_config: HostConfig, kind: str) -> list[Snapshot]:
|
|
kinds = ["scheduled", "manual"] if kind == "all" else [kind]
|
|
records = (
|
|
SnapshotRecord.objects.filter(host=host_config, kind__in=kinds)
|
|
.exclude(kind=SnapshotRecord.Kind.INCOMPLETE)
|
|
.select_related("base")
|
|
.order_by("-started_at", "dirname")
|
|
)
|
|
return [_snapshot_from_record(record) for record in records]
|
|
|
|
|
|
def _incomplete_snapshot_items_for_host(host_config: HostConfig) -> list[dict[str, Any]]:
|
|
records = (
|
|
SnapshotRecord.objects.filter(host=host_config, kind=SnapshotRecord.Kind.INCOMPLETE)
|
|
.select_related("base")
|
|
.order_by("-started_at", "dirname")
|
|
)
|
|
return [
|
|
_snapshot_record_to_item(record, reasons=["incomplete snapshot; excluded from retention cleanup"])
|
|
for record in records
|
|
]
|
|
|
|
|
|
def _reviewed_incomplete_snapshots_for_host(host_config: HostConfig) -> list[Snapshot]:
|
|
records = (
|
|
SnapshotRecord.objects.filter(
|
|
host=host_config,
|
|
kind=SnapshotRecord.Kind.INCOMPLETE,
|
|
reviewed_at__isnull=False,
|
|
)
|
|
.select_related("base")
|
|
.order_by("-started_at", "dirname")
|
|
)
|
|
return [_snapshot_from_record(record) for record in records]
|
|
|
|
|
|
def _unreviewed_incomplete_count(host_config: HostConfig) -> int:
|
|
return SnapshotRecord.objects.filter(
|
|
host=host_config,
|
|
kind=SnapshotRecord.Kind.INCOMPLETE,
|
|
reviewed_at__isnull=True,
|
|
).count()
|
|
|
|
|
|
def _snapshot_from_record(record: SnapshotRecord) -> Snapshot:
|
|
return Snapshot(
|
|
kind=record.kind,
|
|
dirname=record.dirname,
|
|
path=record.path,
|
|
dt=record.started_at or datetime.fromtimestamp(0, tz=timezone.utc),
|
|
status=record.status or None,
|
|
base=_base_meta_from_record(record),
|
|
)
|
|
|
|
|
|
def _base_meta_from_record(record: SnapshotRecord) -> dict[str, str] | None:
|
|
if record.base is not None:
|
|
return {
|
|
"kind": record.base.kind,
|
|
"dirname": record.base.dirname,
|
|
"path": record.base.path,
|
|
}
|
|
if record.base_kind and record.base_dirname:
|
|
return {
|
|
"kind": record.base_kind,
|
|
"dirname": record.base_dirname,
|
|
"path": record.base_path,
|
|
}
|
|
return None
|
|
|
|
|
|
def _snapshot_to_item(snapshot: Snapshot, *, reasons: list[str]) -> dict[str, Any]:
|
|
return {
|
|
"dirname": snapshot.dirname,
|
|
"kind": snapshot.kind,
|
|
"path": snapshot.path,
|
|
"dt": snapshot.dt.isoformat(),
|
|
"status": snapshot.status,
|
|
"reasons": reasons,
|
|
"reason": ", ".join(reasons),
|
|
}
|
|
|
|
|
|
def _snapshot_record_to_item(record: SnapshotRecord, *, reasons: list[str]) -> dict[str, Any]:
|
|
item = _snapshot_to_item(_snapshot_from_record(record), reasons=reasons)
|
|
item["reviewed"] = record.reviewed_at is not None
|
|
item["reviewed_at"] = record.reviewed_at.isoformat() if record.reviewed_at else ""
|
|
item["reviewed_by"] = record.reviewed_by
|
|
return item
|
|
|
|
|
|
def _snapshot_delete_path(*, path: Path, dirname: str) -> Path:
|
|
if path.name == "data" and path.parent.name == dirname:
|
|
return path.parent
|
|
return path
|
|
|
|
|
|
def _record_purged_snapshot(
|
|
*,
|
|
host_config: HostConfig,
|
|
kind: str,
|
|
dirname: str,
|
|
path: Path,
|
|
reason: str,
|
|
action: str,
|
|
triggered_by: str,
|
|
metadata: dict[str, Any],
|
|
) -> None:
|
|
PurgedSnapshot.objects.create(
|
|
host=host_config,
|
|
host_name=host_config.host,
|
|
kind=kind,
|
|
dirname=dirname,
|
|
path=str(path),
|
|
reason=reason,
|
|
action=action,
|
|
triggered_by=triggered_by,
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
def _validate_incomplete_delete_path(*, host: str, path: Path, dirname: str) -> None:
|
|
path_parts = path.parts
|
|
if path.name != dirname or ".incomplete" not in path_parts or host not in path_parts:
|
|
raise ConfigError(f"Refusing to delete unexpected incomplete snapshot path: {path}")
|
|
incomplete_index = path_parts.index(".incomplete")
|
|
if incomplete_index == 0 or path_parts[incomplete_index - 1] != host:
|
|
raise ConfigError(f"Refusing to delete incomplete snapshot outside host backup root: {path}")
|
|
|
|
|
|
def _validate_snapshot_delete_path(*, host: str, kind: str, path: Path, dirname: str) -> None:
|
|
if kind not in {SnapshotRecord.Kind.SCHEDULED, SnapshotRecord.Kind.MANUAL}:
|
|
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {kind!r}")
|
|
path_parts = path.parts
|
|
if path.name != dirname or kind not in path_parts or host not in path_parts:
|
|
raise ConfigError(f"Refusing to delete unexpected snapshot path: {path}")
|
|
kind_index = path_parts.index(kind)
|
|
if kind_index == 0 or path_parts[kind_index - 1] != host:
|
|
raise ConfigError(f"Refusing to delete snapshot outside host backup root: {path}")
|
|
|
|
|
|
def _remove_snapshot_tree(path: Path) -> None:
|
|
_make_snapshot_tree_user_removable(path)
|
|
shutil.rmtree(path, onexc=_retry_remove_with_user_permissions)
|
|
|
|
|
|
def _make_snapshot_tree_user_removable(path: Path) -> None:
|
|
stack = [path]
|
|
while stack:
|
|
directory = stack.pop()
|
|
if directory.is_symlink():
|
|
continue
|
|
_make_path_user_removable(directory)
|
|
try:
|
|
children = list(directory.iterdir())
|
|
except OSError:
|
|
continue
|
|
for child in children:
|
|
if child.is_dir() and not child.is_symlink():
|
|
stack.append(child)
|
|
|
|
|
|
def _retry_remove_with_user_permissions(function: Any, path: str, excinfo: BaseException) -> None:
|
|
failed_path = Path(path)
|
|
_make_path_user_removable(failed_path)
|
|
function(path)
|
|
|
|
|
|
def _make_path_user_removable(path: Path) -> None:
|
|
try:
|
|
mode = path.stat().st_mode
|
|
except OSError:
|
|
return
|
|
wanted = stat.S_IRUSR | stat.S_IWUSR
|
|
if path.is_dir() and not path.is_symlink():
|
|
wanted |= stat.S_IXUSR
|
|
if mode & wanted == wanted:
|
|
return
|
|
try:
|
|
path.chmod(mode | wanted)
|
|
except OSError:
|
|
return
|