from __future__ import annotations import shutil import stat from datetime import datetime, timezone from pathlib import Path from typing import Any from pobsync.errors import ConfigError from pobsync.lock import acquire_host_lock from pobsync.paths import PobsyncPaths from pobsync.retention import Snapshot, apply_base_protection, build_retention_plan from pobsync.util import sanitize_host from .models import HostConfig, SnapshotRecord def run_sql_retention_plan(*, host: str, kind: str, protect_bases: bool) -> dict[str, Any]: host = sanitize_host(host) if kind not in {"scheduled", "manual", "all"}: raise ConfigError("kind must be scheduled, manual, or all") host_config = _enabled_host_config(host) retention = _retention_for_host(host_config) snapshots = _snapshots_for_retention(host_config=host_config, kind=kind) incomplete_snapshots = _incomplete_snapshots_for_host(host_config) plan = build_retention_plan( snapshots=snapshots, retention=retention, now=datetime.now(timezone.utc), ) keep = set(plan.keep) reasons = dict(plan.reasons) if protect_bases: keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons) delete = [snapshot for snapshot in snapshots if snapshot.dirname not in keep] keep_items = [snapshot for snapshot in snapshots if snapshot.dirname in keep] return { "ok": True, "host": host, "kind": kind, "protect_bases": bool(protect_bases), "retention": retention, "source": "sql", "keep": sorted(keep), "keep_items": [_snapshot_to_item(snapshot, reasons=reasons.get(snapshot.dirname, [])) for snapshot in keep_items], "delete": [_snapshot_to_item(snapshot, reasons=["outside retention policy"]) for snapshot in delete], "incomplete": [ _snapshot_to_item(snapshot, reasons=["incomplete snapshot; excluded from retention cleanup"]) for snapshot in incomplete_snapshots ], "reasons": reasons, } def run_sql_retention_apply( *, prefix: Path, host: str, kind: str, protect_bases: bool, yes: bool, max_delete: int, acquire_lock: bool = True, ) -> dict[str, Any]: host = sanitize_host(host) if not yes: raise ConfigError("Refusing to delete snapshots without --yes") if max_delete < 0: raise ConfigError("--max-delete must be >= 0") paths = PobsyncPaths(home=prefix) def _do_apply() -> dict[str, Any]: plan = run_sql_retention_plan(host=host, kind=kind, protect_bases=bool(protect_bases)) delete_list = plan.get("delete") or [] incomplete_list = plan.get("incomplete") or [] if not isinstance(delete_list, list): raise ConfigError("Invalid retention plan output: delete is not a list") if not isinstance(incomplete_list, list): raise ConfigError("Invalid retention plan output: incomplete is not a list") if max_delete == 0 and len(delete_list) > 0: raise ConfigError("Deletion blocked by --max-delete=0") if len(delete_list) > max_delete: raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})") actions: list[str] = [] deleted: list[dict[str, Any]] = [] for item in delete_list: dirname = item.get("dirname") if isinstance(item, dict) else None snap_kind = item.get("kind") if isinstance(item, dict) else None snap_path = item.get("path") if isinstance(item, dict) else None if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str): continue if snap_kind not in {"scheduled", "manual"}: raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}") path = _snapshot_delete_path(path=Path(snap_path), dirname=dirname) if not path.exists(): actions.append(f"skip missing {snap_kind}/{dirname}") continue if not path.is_dir(): raise ConfigError(f"Refusing to delete non-directory path: {path}") _remove_snapshot_tree(path) SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete() actions.append(f"deleted {snap_kind} {dirname}") deleted.append({"dirname": dirname, "kind": snap_kind, "path": str(path)}) return { "ok": True, "host": host, "kind": kind, "protect_bases": bool(protect_bases), "max_delete": max_delete, "source": "sql", "planned_delete_count": len(delete_list), "incomplete_ignored_count": len(incomplete_list), "deleted": deleted, "actions": actions, } if acquire_lock: with acquire_host_lock(paths.locks_dir, host, command="retention-apply"): return _do_apply() return _do_apply() def run_incomplete_cleanup( *, prefix: Path, host: str, yes: bool, max_delete: int, acquire_lock: bool = True, ) -> dict[str, Any]: host = sanitize_host(host) if not yes: raise ConfigError("Refusing to delete incomplete snapshots without --yes") if max_delete < 0: raise ConfigError("--max-delete must be >= 0") paths = PobsyncPaths(home=prefix) def _do_cleanup() -> dict[str, Any]: host_config = _enabled_host_config(host) incomplete_list = [ _snapshot_to_item(snapshot, reasons=["manual incomplete cleanup"]) for snapshot in _incomplete_snapshots_for_host(host_config) ] if max_delete == 0 and len(incomplete_list) > 0: raise ConfigError("Incomplete cleanup blocked by --max-delete=0") if len(incomplete_list) > max_delete: raise ConfigError( f"Refusing to delete {len(incomplete_list)} incomplete snapshots (exceeds --max-delete={max_delete})" ) actions: list[str] = [] deleted: list[dict[str, Any]] = [] for item in incomplete_list: dirname = item["dirname"] snap_path = Path(item["path"]) path = _snapshot_delete_path(path=snap_path, dirname=dirname) _validate_incomplete_delete_path(host=host, path=path, dirname=dirname) if not path.exists(): actions.append(f"skip missing incomplete/{dirname}") elif not path.is_dir(): raise ConfigError(f"Refusing to delete non-directory path: {path}") else: _remove_snapshot_tree(path) actions.append(f"deleted incomplete {dirname}") SnapshotRecord.objects.filter( host__host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=dirname, ).delete() deleted.append({"dirname": dirname, "kind": SnapshotRecord.Kind.INCOMPLETE, "path": str(path)}) return { "ok": True, "host": host, "kind": SnapshotRecord.Kind.INCOMPLETE, "max_delete": max_delete, "source": "sql", "planned_delete_count": len(incomplete_list), "deleted": deleted, "actions": actions, } if acquire_lock: with acquire_host_lock(paths.locks_dir, host, command="incomplete-cleanup"): return _do_cleanup() return _do_cleanup() def _enabled_host_config(host: str) -> HostConfig: try: return HostConfig.objects.get(host=host, enabled=True) except HostConfig.DoesNotExist as exc: raise ConfigError(f"Missing enabled host {host!r}") from exc def _retention_for_host(host_config: HostConfig) -> dict[str, int]: return { "daily": host_config.retention_daily, "weekly": host_config.retention_weekly, "monthly": host_config.retention_monthly, "yearly": host_config.retention_yearly, } def _snapshots_for_retention(*, host_config: HostConfig, kind: str) -> list[Snapshot]: kinds = ["scheduled", "manual"] if kind == "all" else [kind] records = ( SnapshotRecord.objects.filter(host=host_config, kind__in=kinds) .exclude(kind=SnapshotRecord.Kind.INCOMPLETE) .select_related("base") .order_by("-started_at", "dirname") ) return [_snapshot_from_record(record) for record in records] def _incomplete_snapshots_for_host(host_config: HostConfig) -> list[Snapshot]: records = ( SnapshotRecord.objects.filter(host=host_config, kind=SnapshotRecord.Kind.INCOMPLETE) .select_related("base") .order_by("-started_at", "dirname") ) return [_snapshot_from_record(record) for record in records] def _snapshot_from_record(record: SnapshotRecord) -> Snapshot: return Snapshot( kind=record.kind, dirname=record.dirname, path=record.path, dt=record.started_at or datetime.fromtimestamp(0, tz=timezone.utc), status=record.status or None, base=_base_meta_from_record(record), ) def _base_meta_from_record(record: SnapshotRecord) -> dict[str, str] | None: if record.base is not None: return { "kind": record.base.kind, "dirname": record.base.dirname, "path": record.base.path, } if record.base_kind and record.base_dirname: return { "kind": record.base_kind, "dirname": record.base_dirname, "path": record.base_path, } return None def _snapshot_to_item(snapshot: Snapshot, *, reasons: list[str]) -> dict[str, Any]: return { "dirname": snapshot.dirname, "kind": snapshot.kind, "path": snapshot.path, "dt": snapshot.dt.isoformat(), "status": snapshot.status, "reasons": reasons, "reason": ", ".join(reasons), } def _snapshot_delete_path(*, path: Path, dirname: str) -> Path: if path.name == "data" and path.parent.name == dirname: return path.parent return path def _validate_incomplete_delete_path(*, host: str, path: Path, dirname: str) -> None: path_parts = path.parts if path.name != dirname or ".incomplete" not in path_parts or host not in path_parts: raise ConfigError(f"Refusing to delete unexpected incomplete snapshot path: {path}") incomplete_index = path_parts.index(".incomplete") if incomplete_index == 0 or path_parts[incomplete_index - 1] != host: raise ConfigError(f"Refusing to delete incomplete snapshot outside host backup root: {path}") def _remove_snapshot_tree(path: Path) -> None: _make_directories_user_writable(path) shutil.rmtree(path) def _make_directories_user_writable(path: Path) -> None: for directory in [path, *[child for child in path.rglob("*") if child.is_dir() and not child.is_symlink()]]: mode = directory.stat().st_mode if mode & stat.S_IWUSR: continue directory.chmod(mode | stat.S_IWUSR)