Plan Django retention from snapshot records
This commit is contained in:
182
src/pobsync_backend/retention.py
Normal file
182
src/pobsync_backend/retention.py
Normal file
@@ -0,0 +1,182 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pobsync.errors import ConfigError
|
||||
from pobsync.lock import acquire_host_lock
|
||||
from pobsync.paths import PobsyncPaths
|
||||
from pobsync.retention import Snapshot, apply_base_protection, build_retention_plan
|
||||
from pobsync.util import sanitize_host
|
||||
|
||||
from .models import HostConfig, SnapshotRecord
|
||||
|
||||
|
||||
def run_sql_retention_plan(*, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
|
||||
host = sanitize_host(host)
|
||||
if kind not in {"scheduled", "manual", "all"}:
|
||||
raise ConfigError("kind must be scheduled, manual, or all")
|
||||
|
||||
host_config = _enabled_host_config(host)
|
||||
retention = _retention_for_host(host_config)
|
||||
snapshots = _snapshots_for_retention(host_config=host_config, kind=kind)
|
||||
|
||||
plan = build_retention_plan(
|
||||
snapshots=snapshots,
|
||||
retention=retention,
|
||||
now=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
keep = set(plan.keep)
|
||||
reasons = dict(plan.reasons)
|
||||
if protect_bases:
|
||||
keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
|
||||
|
||||
delete = [snapshot for snapshot in snapshots if snapshot.dirname not in keep]
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"host": host,
|
||||
"kind": kind,
|
||||
"protect_bases": bool(protect_bases),
|
||||
"retention": retention,
|
||||
"source": "sql",
|
||||
"keep": sorted(keep),
|
||||
"delete": [_snapshot_to_delete_item(snapshot) for snapshot in delete],
|
||||
"reasons": reasons,
|
||||
}
|
||||
|
||||
|
||||
def run_sql_retention_apply(
|
||||
*,
|
||||
prefix: Path,
|
||||
host: str,
|
||||
kind: str,
|
||||
protect_bases: bool,
|
||||
yes: bool,
|
||||
max_delete: int,
|
||||
acquire_lock: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
host = sanitize_host(host)
|
||||
if not yes:
|
||||
raise ConfigError("Refusing to delete snapshots without --yes")
|
||||
if max_delete < 0:
|
||||
raise ConfigError("--max-delete must be >= 0")
|
||||
|
||||
paths = PobsyncPaths(home=prefix)
|
||||
|
||||
def _do_apply() -> dict[str, Any]:
|
||||
plan = run_sql_retention_plan(host=host, kind=kind, protect_bases=bool(protect_bases))
|
||||
delete_list = plan.get("delete") or []
|
||||
if not isinstance(delete_list, list):
|
||||
raise ConfigError("Invalid retention plan output: delete is not a list")
|
||||
if max_delete == 0 and len(delete_list) > 0:
|
||||
raise ConfigError("Deletion blocked by --max-delete=0")
|
||||
if len(delete_list) > max_delete:
|
||||
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
|
||||
|
||||
actions: list[str] = []
|
||||
deleted: list[dict[str, Any]] = []
|
||||
|
||||
for item in delete_list:
|
||||
dirname = item.get("dirname") if isinstance(item, dict) else None
|
||||
snap_kind = item.get("kind") if isinstance(item, dict) else None
|
||||
snap_path = item.get("path") if isinstance(item, dict) else None
|
||||
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
|
||||
continue
|
||||
if snap_kind not in {"scheduled", "manual"}:
|
||||
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
|
||||
|
||||
path = Path(snap_path)
|
||||
if not path.exists():
|
||||
actions.append(f"skip missing {snap_kind}/{dirname}")
|
||||
continue
|
||||
if not path.is_dir():
|
||||
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
|
||||
|
||||
shutil.rmtree(path)
|
||||
SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete()
|
||||
actions.append(f"deleted {snap_kind} {dirname}")
|
||||
deleted.append({"dirname": dirname, "kind": snap_kind, "path": snap_path})
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"host": host,
|
||||
"kind": kind,
|
||||
"protect_bases": bool(protect_bases),
|
||||
"max_delete": max_delete,
|
||||
"source": "sql",
|
||||
"deleted": deleted,
|
||||
"actions": actions,
|
||||
}
|
||||
|
||||
if acquire_lock:
|
||||
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
|
||||
return _do_apply()
|
||||
return _do_apply()
|
||||
|
||||
|
||||
def _enabled_host_config(host: str) -> HostConfig:
|
||||
try:
|
||||
return HostConfig.objects.get(host=host, enabled=True)
|
||||
except HostConfig.DoesNotExist as exc:
|
||||
raise ConfigError(f"Missing enabled HostConfig {host!r}") from exc
|
||||
|
||||
|
||||
def _retention_for_host(host_config: HostConfig) -> dict[str, int]:
|
||||
return {
|
||||
"daily": host_config.retention_daily,
|
||||
"weekly": host_config.retention_weekly,
|
||||
"monthly": host_config.retention_monthly,
|
||||
"yearly": host_config.retention_yearly,
|
||||
}
|
||||
|
||||
|
||||
def _snapshots_for_retention(*, host_config: HostConfig, kind: str) -> list[Snapshot]:
|
||||
kinds = ["scheduled", "manual"] if kind == "all" else [kind]
|
||||
records = (
|
||||
SnapshotRecord.objects.filter(host=host_config, kind__in=kinds)
|
||||
.exclude(kind=SnapshotRecord.Kind.INCOMPLETE)
|
||||
.select_related("base")
|
||||
.order_by("-started_at", "dirname")
|
||||
)
|
||||
return [_snapshot_from_record(record) for record in records]
|
||||
|
||||
|
||||
def _snapshot_from_record(record: SnapshotRecord) -> Snapshot:
|
||||
return Snapshot(
|
||||
kind=record.kind,
|
||||
dirname=record.dirname,
|
||||
path=record.path,
|
||||
dt=record.started_at or datetime.fromtimestamp(0, tz=timezone.utc),
|
||||
status=record.status or None,
|
||||
base=_base_meta_from_record(record),
|
||||
)
|
||||
|
||||
|
||||
def _base_meta_from_record(record: SnapshotRecord) -> dict[str, str] | None:
|
||||
if record.base is not None:
|
||||
return {
|
||||
"kind": record.base.kind,
|
||||
"dirname": record.base.dirname,
|
||||
"path": record.base.path,
|
||||
}
|
||||
if record.base_kind and record.base_dirname:
|
||||
return {
|
||||
"kind": record.base_kind,
|
||||
"dirname": record.base_dirname,
|
||||
"path": record.base_path,
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
def _snapshot_to_delete_item(snapshot: Snapshot) -> dict[str, Any]:
|
||||
return {
|
||||
"dirname": snapshot.dirname,
|
||||
"kind": snapshot.kind,
|
||||
"path": snapshot.path,
|
||||
"dt": snapshot.dt.isoformat(),
|
||||
"status": snapshot.status,
|
||||
}
|
||||
Reference in New Issue
Block a user