from __future__ import annotations import shutil import stat from datetime import datetime, timezone from pathlib import Path from typing import Any from pobsync.errors import ConfigError from pobsync.lock import acquire_host_lock from pobsync.paths import PobsyncPaths from pobsync.retention import Snapshot, apply_base_protection, build_retention_plan from pobsync.util import sanitize_host from .models import HostConfig, PurgedSnapshot, SnapshotRecord def run_sql_retention_plan(*, host: str, kind: str, protect_bases: bool) -> dict[str, Any]: host = sanitize_host(host) if kind not in {"scheduled", "manual", "all"}: raise ConfigError("kind must be scheduled, manual, or all") host_config = _enabled_host_config(host) retention = _retention_for_host(host_config) snapshots = _snapshots_for_retention(host_config=host_config, kind=kind) incomplete_snapshots = _incomplete_snapshots_for_host(host_config) plan = build_retention_plan( snapshots=snapshots, retention=retention, now=datetime.now(timezone.utc), ) keep = set(plan.keep) reasons = dict(plan.reasons) if protect_bases: keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons) delete = [snapshot for snapshot in snapshots if snapshot.dirname not in keep] keep_items = [snapshot for snapshot in snapshots if snapshot.dirname in keep] return { "ok": True, "host": host, "kind": kind, "protect_bases": bool(protect_bases), "retention": retention, "source": "sql", "keep": sorted(keep), "keep_items": [_snapshot_to_item(snapshot, reasons=reasons.get(snapshot.dirname, [])) for snapshot in keep_items], "delete": [_snapshot_to_item(snapshot, reasons=["outside retention policy"]) for snapshot in delete], "incomplete": [ _snapshot_to_item(snapshot, reasons=["incomplete snapshot; excluded from retention cleanup"]) for snapshot in incomplete_snapshots ], "reasons": reasons, } def run_sql_retention_apply( *, prefix: Path, host: str, kind: str, protect_bases: bool, yes: bool, max_delete: int, action: str = PurgedSnapshot.Action.MANUAL, triggered_by: str = "", acquire_lock: bool = True, ) -> dict[str, Any]: host = sanitize_host(host) if not yes: raise ConfigError("Refusing to delete snapshots without --yes") if max_delete < 0: raise ConfigError("--max-delete must be >= 0") paths = PobsyncPaths(home=prefix) def _do_apply() -> dict[str, Any]: plan = run_sql_retention_plan(host=host, kind=kind, protect_bases=bool(protect_bases)) delete_list = plan.get("delete") or [] incomplete_list = plan.get("incomplete") or [] if not isinstance(delete_list, list): raise ConfigError("Invalid retention plan output: delete is not a list") if not isinstance(incomplete_list, list): raise ConfigError("Invalid retention plan output: incomplete is not a list") if max_delete == 0 and len(delete_list) > 0: raise ConfigError("Deletion blocked by --max-delete=0") if len(delete_list) > max_delete: raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})") actions: list[str] = [] deleted: list[dict[str, Any]] = [] for item in delete_list: dirname = item.get("dirname") if isinstance(item, dict) else None snap_kind = item.get("kind") if isinstance(item, dict) else None snap_path = item.get("path") if isinstance(item, dict) else None if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str): continue if snap_kind not in {"scheduled", "manual"}: raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}") path = _snapshot_delete_path(path=Path(snap_path), dirname=dirname) _validate_snapshot_delete_path(host=host, kind=snap_kind, path=path, dirname=dirname) reason = str(item.get("reason") or "outside retention policy") if not path.exists(): actions.append(f"skip missing {snap_kind}/{dirname}") continue if not path.is_dir(): raise ConfigError(f"Refusing to delete non-directory path: {path}") _remove_snapshot_tree(path) _record_purged_snapshot( host_config=_enabled_host_config(host), kind=snap_kind, dirname=dirname, path=path, reason=reason, action=action, triggered_by=triggered_by, metadata={"source": "retention", "protect_bases": bool(protect_bases), "retention_kind": kind}, ) SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete() actions.append(f"deleted {snap_kind} {dirname}") deleted.append({"dirname": dirname, "kind": snap_kind, "path": str(path), "reason": reason}) return { "ok": True, "host": host, "kind": kind, "protect_bases": bool(protect_bases), "max_delete": max_delete, "source": "sql", "planned_delete_count": len(delete_list), "incomplete_ignored_count": len(incomplete_list), "deleted": deleted, "actions": actions, } if acquire_lock: with acquire_host_lock(paths.locks_dir, host, command="retention-apply"): return _do_apply() return _do_apply() def run_incomplete_cleanup( *, prefix: Path, host: str, yes: bool, max_delete: int, triggered_by: str = "", acquire_lock: bool = True, ) -> dict[str, Any]: host = sanitize_host(host) if not yes: raise ConfigError("Refusing to delete incomplete snapshots without --yes") if max_delete < 0: raise ConfigError("--max-delete must be >= 0") paths = PobsyncPaths(home=prefix) def _do_cleanup() -> dict[str, Any]: host_config = _enabled_host_config(host) incomplete_list = [ _snapshot_to_item(snapshot, reasons=["manual incomplete cleanup"]) for snapshot in _incomplete_snapshots_for_host(host_config) ] if max_delete == 0 and len(incomplete_list) > 0: raise ConfigError("Incomplete cleanup blocked by --max-delete=0") if len(incomplete_list) > max_delete: raise ConfigError( f"Refusing to delete {len(incomplete_list)} incomplete snapshots (exceeds --max-delete={max_delete})" ) actions: list[str] = [] deleted: list[dict[str, Any]] = [] for item in incomplete_list: dirname = item["dirname"] snap_path = Path(item["path"]) path = _snapshot_delete_path(path=snap_path, dirname=dirname) _validate_incomplete_delete_path(host=host, path=path, dirname=dirname) if not path.exists(): actions.append(f"skip missing incomplete/{dirname}") elif not path.is_dir(): raise ConfigError(f"Refusing to delete non-directory path: {path}") else: _remove_snapshot_tree(path) actions.append(f"deleted incomplete {dirname}") _record_purged_snapshot( host_config=host_config, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=dirname, path=path, reason="manual incomplete cleanup", action=PurgedSnapshot.Action.INCOMPLETE_CLEANUP, triggered_by=triggered_by, metadata={"source": "incomplete_cleanup"}, ) SnapshotRecord.objects.filter( host__host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=dirname, ).delete() deleted.append({"dirname": dirname, "kind": SnapshotRecord.Kind.INCOMPLETE, "path": str(path)}) return { "ok": True, "host": host, "kind": SnapshotRecord.Kind.INCOMPLETE, "max_delete": max_delete, "source": "sql", "planned_delete_count": len(incomplete_list), "deleted": deleted, "actions": actions, } if acquire_lock: with acquire_host_lock(paths.locks_dir, host, command="incomplete-cleanup"): return _do_cleanup() return _do_cleanup() def _enabled_host_config(host: str) -> HostConfig: try: return HostConfig.objects.get(host=host, enabled=True) except HostConfig.DoesNotExist as exc: raise ConfigError(f"Missing enabled host {host!r}") from exc def _retention_for_host(host_config: HostConfig) -> dict[str, int]: return { "daily": host_config.retention_daily, "weekly": host_config.retention_weekly, "monthly": host_config.retention_monthly, "yearly": host_config.retention_yearly, } def _snapshots_for_retention(*, host_config: HostConfig, kind: str) -> list[Snapshot]: kinds = ["scheduled", "manual"] if kind == "all" else [kind] records = ( SnapshotRecord.objects.filter(host=host_config, kind__in=kinds) .exclude(kind=SnapshotRecord.Kind.INCOMPLETE) .select_related("base") .order_by("-started_at", "dirname") ) return [_snapshot_from_record(record) for record in records] def _incomplete_snapshots_for_host(host_config: HostConfig) -> list[Snapshot]: records = ( SnapshotRecord.objects.filter(host=host_config, kind=SnapshotRecord.Kind.INCOMPLETE) .select_related("base") .order_by("-started_at", "dirname") ) return [_snapshot_from_record(record) for record in records] def _snapshot_from_record(record: SnapshotRecord) -> Snapshot: return Snapshot( kind=record.kind, dirname=record.dirname, path=record.path, dt=record.started_at or datetime.fromtimestamp(0, tz=timezone.utc), status=record.status or None, base=_base_meta_from_record(record), ) def _base_meta_from_record(record: SnapshotRecord) -> dict[str, str] | None: if record.base is not None: return { "kind": record.base.kind, "dirname": record.base.dirname, "path": record.base.path, } if record.base_kind and record.base_dirname: return { "kind": record.base_kind, "dirname": record.base_dirname, "path": record.base_path, } return None def _snapshot_to_item(snapshot: Snapshot, *, reasons: list[str]) -> dict[str, Any]: return { "dirname": snapshot.dirname, "kind": snapshot.kind, "path": snapshot.path, "dt": snapshot.dt.isoformat(), "status": snapshot.status, "reasons": reasons, "reason": ", ".join(reasons), } def _snapshot_delete_path(*, path: Path, dirname: str) -> Path: if path.name == "data" and path.parent.name == dirname: return path.parent return path def _record_purged_snapshot( *, host_config: HostConfig, kind: str, dirname: str, path: Path, reason: str, action: str, triggered_by: str, metadata: dict[str, Any], ) -> None: PurgedSnapshot.objects.create( host=host_config, host_name=host_config.host, kind=kind, dirname=dirname, path=str(path), reason=reason, action=action, triggered_by=triggered_by, metadata=metadata, ) def _validate_incomplete_delete_path(*, host: str, path: Path, dirname: str) -> None: path_parts = path.parts if path.name != dirname or ".incomplete" not in path_parts or host not in path_parts: raise ConfigError(f"Refusing to delete unexpected incomplete snapshot path: {path}") incomplete_index = path_parts.index(".incomplete") if incomplete_index == 0 or path_parts[incomplete_index - 1] != host: raise ConfigError(f"Refusing to delete incomplete snapshot outside host backup root: {path}") def _validate_snapshot_delete_path(*, host: str, kind: str, path: Path, dirname: str) -> None: if kind not in {SnapshotRecord.Kind.SCHEDULED, SnapshotRecord.Kind.MANUAL}: raise ConfigError(f"Refusing to delete unsupported snapshot kind: {kind!r}") path_parts = path.parts if path.name != dirname or kind not in path_parts or host not in path_parts: raise ConfigError(f"Refusing to delete unexpected snapshot path: {path}") kind_index = path_parts.index(kind) if kind_index == 0 or path_parts[kind_index - 1] != host: raise ConfigError(f"Refusing to delete snapshot outside host backup root: {path}") def _remove_snapshot_tree(path: Path) -> None: _make_snapshot_tree_user_removable(path) shutil.rmtree(path, onexc=_retry_remove_with_user_permissions) def _make_snapshot_tree_user_removable(path: Path) -> None: stack = [path] while stack: directory = stack.pop() if directory.is_symlink(): continue _make_path_user_removable(directory) try: children = list(directory.iterdir()) except OSError: continue for child in children: if child.is_dir() and not child.is_symlink(): stack.append(child) def _retry_remove_with_user_permissions(function: Any, path: str, excinfo: BaseException) -> None: failed_path = Path(path) _make_path_user_removable(failed_path) function(path) def _make_path_user_removable(path: Path) -> None: try: mode = path.stat().st_mode except OSError: return wanted = stat.S_IRUSR | stat.S_IWUSR if path.is_dir() and not path.is_symlink(): wanted |= stat.S_IXUSR if mode & wanted == wanted: return try: path.chmod(mode | wanted) except OSError: return