Compare commits

...

2 Commits

3 changed files with 207 additions and 7 deletions

View File

@@ -9,6 +9,7 @@ from .commands.doctor import run_doctor
from .commands.init_host import run_init_host
from .commands.install import run_install
from .commands.list_remotes import run_list_remotes
from .commands.retention_apply import run_retention_apply
from .commands.retention_plan import run_retention_plan
from .commands.run_scheduled import run_scheduled
from .commands.show_config import dump_yaml, run_show_config
@@ -98,14 +99,40 @@ def build_parser() -> argparse.ArgumentParser:
sn_show.set_defaults(_handler=cmd_snapshots_show)
# retention
rt = sub.add_parser("retention", help="Retention management (dry-run)")
rt = sub.add_parser("retention", help="Retention management")
rt_sub = rt.add_subparsers(dest="retention_cmd", required=True)
rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)")
rt_plan.add_argument("host", help="Host name")
rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_plan.add_argument(
"--protect-bases",
action="store_true",
help="Also keep base snapshots referenced in meta (default: false)",
)
rt_plan.set_defaults(_handler=cmd_retention_plan)
rt_apply = rt_sub.add_parser("apply", help="Apply retention plan (DESTRUCTIVE)")
rt_apply.add_argument("host", help="Host name")
rt_apply.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_apply.add_argument(
"--protect-bases",
action="store_true",
help="Also keep base snapshots referenced in meta (default: false)",
)
rt_apply.add_argument(
"--max-delete",
type=int,
default=10,
help="Refuse to delete more than N snapshots (default: 10; set 0 to block)",
)
rt_apply.add_argument(
"--yes",
action="store_true",
help="Confirm deletion",
)
rt_apply.set_defaults(_handler=cmd_retention_apply)
return p
@@ -200,6 +227,9 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
print(f"- keep {len(keep)}")
print(f"- delete {len(delete)}")
if result.get("protect_bases") is True:
print("- protect_bases true")
if keep:
print("- keep:")
for d in keep:
@@ -359,7 +389,21 @@ def cmd_snapshots_show(args: argparse.Namespace) -> int:
def cmd_retention_plan(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind)
result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind, protect_bases=bool(args.protect_bases))
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_retention_apply(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_apply(
prefix=prefix,
host=args.host,
kind=args.kind,
protect_bases=bool(args.protect_bases),
yes=bool(args.yes),
max_delete=int(args.max_delete),
)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import Any, Dict, List
from ..errors import ConfigError
from ..lock import acquire_host_lock
from ..paths import PobsyncPaths
from ..util import sanitize_host
from .retention_plan import run_retention_plan
def run_retention_apply(
prefix: Path,
host: str,
kind: str,
protect_bases: bool,
yes: bool,
max_delete: int,
) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
if not yes:
raise ConfigError("Refusing to delete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
plan = run_retention_plan(prefix=prefix, host=host, kind=kind, protect_bases=bool(protect_bases))
delete_list = plan.get("delete") or []
if not isinstance(delete_list, list):
raise ConfigError("Invalid retention plan output: delete is not a list")
if max_delete == 0 and len(delete_list) > 0:
raise ConfigError("Deletion blocked by --max-delete=0")
if len(delete_list) > max_delete:
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
actions: List[str] = []
deleted: List[Dict[str, Any]] = []
for item in delete_list:
if not isinstance(item, dict):
continue
dirname = item.get("dirname")
snap_kind = item.get("kind")
snap_path = item.get("path")
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
continue
# Hard safety: only allow scheduled/manual deletions from plan
if snap_kind not in {"scheduled", "manual"}:
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
p = Path(snap_path)
if not p.exists():
actions.append(f"skip missing {snap_kind}/{dirname}")
continue
if not p.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
# Destructive action
shutil.rmtree(p)
actions.append(f"deleted {snap_kind} {dirname}")
deleted.append(
{
"dirname": dirname,
"kind": snap_kind,
"path": snap_path,
}
)
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"max_delete": max_delete,
"deleted": deleted,
"actions": actions,
}

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, Tuple
from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config
@@ -29,7 +29,60 @@ def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime:
return datetime.fromtimestamp(0, tz=timezone.utc)
def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
def _apply_base_protection(
snapshots: List[Snapshot],
keep: set[str],
reasons: Dict[str, List[str]],
) -> Tuple[set[str], Dict[str, List[str]]]:
"""
Optional policy: if a kept snapshot has a base (kind+dirname), also keep that base snapshot.
This is NOT required for hardlink snapshots to remain readable, but can be useful
for performance (better base selection) or "chain" readability.
Adds reason: "base-of:<child_dirname>"
"""
# Index snapshots by (kind, dirname)
idx: Dict[Tuple[str, str], Snapshot] = {(s.kind, s.dirname): s for s in snapshots}
changed = True
while changed:
changed = False
# Iterate over a stable list of current keep items
for child_dirname in list(keep):
# Find the child snapshot (may exist in multiple kinds; check both)
child: Optional[Snapshot] = None
for k in ("scheduled", "manual"):
child = idx.get((k, child_dirname))
if child is not None:
break
if child is None:
continue
base = child.base
if not isinstance(base, dict):
continue
base_kind = base.get("kind")
base_dirname = base.get("dirname")
if not isinstance(base_kind, str) or not isinstance(base_dirname, str):
continue
base_snap = idx.get((base_kind, base_dirname))
if base_snap is None:
# Base might have been pruned already or never existed; ignore.
continue
if base_dirname not in keep:
keep.add(base_dirname)
reasons.setdefault(base_dirname, []).append(f"base-of:{child_dirname}")
changed = True
return keep, reasons
def run_retention_plan(prefix: Path, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
@@ -81,14 +134,21 @@ def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
now=datetime.now(timezone.utc),
)
delete = [s for s in snapshots if s.dirname not in plan.keep]
keep = set(plan.keep)
reasons = dict(plan.reasons)
if protect_bases:
keep, reasons = _apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [s for s in snapshots if s.dirname not in keep]
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"retention": retention,
"keep": sorted(plan.keep),
"keep": sorted(keep),
"delete": [
{
"dirname": s.dirname,
@@ -99,6 +159,6 @@ def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
}
for s in delete
],
"reasons": plan.reasons,
"reasons": reasons,
}