feat(retention): add optional base protection to plan

This commit is contained in:
2026-02-03 12:42:54 +01:00
parent 1e5807790d
commit fdd292b1b6
2 changed files with 74 additions and 6 deletions

View File

@@ -104,6 +104,11 @@ def build_parser() -> argparse.ArgumentParser:
rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)") rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)")
rt_plan.add_argument("host", help="Host name") rt_plan.add_argument("host", help="Host name")
rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)") rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_plan.add_argument(
"--protect-bases",
action="store_true",
help="Also keep base snapshots referenced in meta (default: false)",
)
rt_plan.set_defaults(_handler=cmd_retention_plan) rt_plan.set_defaults(_handler=cmd_retention_plan)
return p return p
@@ -200,6 +205,9 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
print(f"- keep {len(keep)}") print(f"- keep {len(keep)}")
print(f"- delete {len(delete)}") print(f"- delete {len(delete)}")
if result.get("protect_bases") is True:
print("- protect_bases true")
if keep: if keep:
print("- keep:") print("- keep:")
for d in keep: for d in keep:
@@ -359,7 +367,7 @@ def cmd_snapshots_show(args: argparse.Namespace) -> int:
def cmd_retention_plan(args: argparse.Namespace) -> int: def cmd_retention_plan(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind) result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind, protect_bases=bool(args.protect_bases))
_print(result, as_json=bool(args.json)) _print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1 return 0 if result.get("ok") else 1

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List from typing import Any, Dict, List, Optional, Tuple
from ..config.load import load_global_config, load_host_config from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config from ..config.merge import build_effective_config
@@ -29,7 +29,60 @@ def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime:
return datetime.fromtimestamp(0, tz=timezone.utc) return datetime.fromtimestamp(0, tz=timezone.utc)
def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]: def _apply_base_protection(
snapshots: List[Snapshot],
keep: set[str],
reasons: Dict[str, List[str]],
) -> Tuple[set[str], Dict[str, List[str]]]:
"""
Optional policy: if a kept snapshot has a base (kind+dirname), also keep that base snapshot.
This is NOT required for hardlink snapshots to remain readable, but can be useful
for performance (better base selection) or "chain" readability.
Adds reason: "base-of:<child_dirname>"
"""
# Index snapshots by (kind, dirname)
idx: Dict[Tuple[str, str], Snapshot] = {(s.kind, s.dirname): s for s in snapshots}
changed = True
while changed:
changed = False
# Iterate over a stable list of current keep items
for child_dirname in list(keep):
# Find the child snapshot (may exist in multiple kinds; check both)
child: Optional[Snapshot] = None
for k in ("scheduled", "manual"):
child = idx.get((k, child_dirname))
if child is not None:
break
if child is None:
continue
base = child.base
if not isinstance(base, dict):
continue
base_kind = base.get("kind")
base_dirname = base.get("dirname")
if not isinstance(base_kind, str) or not isinstance(base_dirname, str):
continue
base_snap = idx.get((base_kind, base_dirname))
if base_snap is None:
# Base might have been pruned already or never existed; ignore.
continue
if base_dirname not in keep:
keep.add(base_dirname)
reasons.setdefault(base_dirname, []).append(f"base-of:{child_dirname}")
changed = True
return keep, reasons
def run_retention_plan(prefix: Path, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
host = sanitize_host(host) host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}: if kind not in {"scheduled", "manual", "all"}:
@@ -81,14 +134,21 @@ def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
now=datetime.now(timezone.utc), now=datetime.now(timezone.utc),
) )
delete = [s for s in snapshots if s.dirname not in plan.keep] keep = set(plan.keep)
reasons = dict(plan.reasons)
if protect_bases:
keep, reasons = _apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [s for s in snapshots if s.dirname not in keep]
return { return {
"ok": True, "ok": True,
"host": host, "host": host,
"kind": kind, "kind": kind,
"protect_bases": bool(protect_bases),
"retention": retention, "retention": retention,
"keep": sorted(plan.keep), "keep": sorted(keep),
"delete": [ "delete": [
{ {
"dirname": s.dirname, "dirname": s.dirname,
@@ -99,6 +159,6 @@ def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
} }
for s in delete for s in delete
], ],
"reasons": plan.reasons, "reasons": reasons,
} }