Compare commits

...

2 Commits

3 changed files with 207 additions and 7 deletions

View File

@@ -9,6 +9,7 @@ from .commands.doctor import run_doctor
from .commands.init_host import run_init_host from .commands.init_host import run_init_host
from .commands.install import run_install from .commands.install import run_install
from .commands.list_remotes import run_list_remotes from .commands.list_remotes import run_list_remotes
from .commands.retention_apply import run_retention_apply
from .commands.retention_plan import run_retention_plan from .commands.retention_plan import run_retention_plan
from .commands.run_scheduled import run_scheduled from .commands.run_scheduled import run_scheduled
from .commands.show_config import dump_yaml, run_show_config from .commands.show_config import dump_yaml, run_show_config
@@ -98,14 +99,40 @@ def build_parser() -> argparse.ArgumentParser:
sn_show.set_defaults(_handler=cmd_snapshots_show) sn_show.set_defaults(_handler=cmd_snapshots_show)
# retention # retention
rt = sub.add_parser("retention", help="Retention management (dry-run)") rt = sub.add_parser("retention", help="Retention management")
rt_sub = rt.add_subparsers(dest="retention_cmd", required=True) rt_sub = rt.add_subparsers(dest="retention_cmd", required=True)
rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)") rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)")
rt_plan.add_argument("host", help="Host name") rt_plan.add_argument("host", help="Host name")
rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)") rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_plan.add_argument(
"--protect-bases",
action="store_true",
help="Also keep base snapshots referenced in meta (default: false)",
)
rt_plan.set_defaults(_handler=cmd_retention_plan) rt_plan.set_defaults(_handler=cmd_retention_plan)
rt_apply = rt_sub.add_parser("apply", help="Apply retention plan (DESTRUCTIVE)")
rt_apply.add_argument("host", help="Host name")
rt_apply.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_apply.add_argument(
"--protect-bases",
action="store_true",
help="Also keep base snapshots referenced in meta (default: false)",
)
rt_apply.add_argument(
"--max-delete",
type=int,
default=10,
help="Refuse to delete more than N snapshots (default: 10; set 0 to block)",
)
rt_apply.add_argument(
"--yes",
action="store_true",
help="Confirm deletion",
)
rt_apply.set_defaults(_handler=cmd_retention_apply)
return p return p
@@ -200,6 +227,9 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
print(f"- keep {len(keep)}") print(f"- keep {len(keep)}")
print(f"- delete {len(delete)}") print(f"- delete {len(delete)}")
if result.get("protect_bases") is True:
print("- protect_bases true")
if keep: if keep:
print("- keep:") print("- keep:")
for d in keep: for d in keep:
@@ -359,7 +389,21 @@ def cmd_snapshots_show(args: argparse.Namespace) -> int:
def cmd_retention_plan(args: argparse.Namespace) -> int: def cmd_retention_plan(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind) result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind, protect_bases=bool(args.protect_bases))
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_retention_apply(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_apply(
prefix=prefix,
host=args.host,
kind=args.kind,
protect_bases=bool(args.protect_bases),
yes=bool(args.yes),
max_delete=int(args.max_delete),
)
_print(result, as_json=bool(args.json)) _print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1 return 0 if result.get("ok") else 1

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import Any, Dict, List
from ..errors import ConfigError
from ..lock import acquire_host_lock
from ..paths import PobsyncPaths
from ..util import sanitize_host
from .retention_plan import run_retention_plan
def run_retention_apply(
prefix: Path,
host: str,
kind: str,
protect_bases: bool,
yes: bool,
max_delete: int,
) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
if not yes:
raise ConfigError("Refusing to delete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
plan = run_retention_plan(prefix=prefix, host=host, kind=kind, protect_bases=bool(protect_bases))
delete_list = plan.get("delete") or []
if not isinstance(delete_list, list):
raise ConfigError("Invalid retention plan output: delete is not a list")
if max_delete == 0 and len(delete_list) > 0:
raise ConfigError("Deletion blocked by --max-delete=0")
if len(delete_list) > max_delete:
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
actions: List[str] = []
deleted: List[Dict[str, Any]] = []
for item in delete_list:
if not isinstance(item, dict):
continue
dirname = item.get("dirname")
snap_kind = item.get("kind")
snap_path = item.get("path")
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
continue
# Hard safety: only allow scheduled/manual deletions from plan
if snap_kind not in {"scheduled", "manual"}:
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
p = Path(snap_path)
if not p.exists():
actions.append(f"skip missing {snap_kind}/{dirname}")
continue
if not p.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
# Destructive action
shutil.rmtree(p)
actions.append(f"deleted {snap_kind} {dirname}")
deleted.append(
{
"dirname": dirname,
"kind": snap_kind,
"path": snap_path,
}
)
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"max_delete": max_delete,
"deleted": deleted,
"actions": actions,
}

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List from typing import Any, Dict, List, Optional, Tuple
from ..config.load import load_global_config, load_host_config from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config from ..config.merge import build_effective_config
@@ -29,7 +29,60 @@ def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime:
return datetime.fromtimestamp(0, tz=timezone.utc) return datetime.fromtimestamp(0, tz=timezone.utc)
def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]: def _apply_base_protection(
snapshots: List[Snapshot],
keep: set[str],
reasons: Dict[str, List[str]],
) -> Tuple[set[str], Dict[str, List[str]]]:
"""
Optional policy: if a kept snapshot has a base (kind+dirname), also keep that base snapshot.
This is NOT required for hardlink snapshots to remain readable, but can be useful
for performance (better base selection) or "chain" readability.
Adds reason: "base-of:<child_dirname>"
"""
# Index snapshots by (kind, dirname)
idx: Dict[Tuple[str, str], Snapshot] = {(s.kind, s.dirname): s for s in snapshots}
changed = True
while changed:
changed = False
# Iterate over a stable list of current keep items
for child_dirname in list(keep):
# Find the child snapshot (may exist in multiple kinds; check both)
child: Optional[Snapshot] = None
for k in ("scheduled", "manual"):
child = idx.get((k, child_dirname))
if child is not None:
break
if child is None:
continue
base = child.base
if not isinstance(base, dict):
continue
base_kind = base.get("kind")
base_dirname = base.get("dirname")
if not isinstance(base_kind, str) or not isinstance(base_dirname, str):
continue
base_snap = idx.get((base_kind, base_dirname))
if base_snap is None:
# Base might have been pruned already or never existed; ignore.
continue
if base_dirname not in keep:
keep.add(base_dirname)
reasons.setdefault(base_dirname, []).append(f"base-of:{child_dirname}")
changed = True
return keep, reasons
def run_retention_plan(prefix: Path, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
host = sanitize_host(host) host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}: if kind not in {"scheduled", "manual", "all"}:
@@ -81,14 +134,21 @@ def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
now=datetime.now(timezone.utc), now=datetime.now(timezone.utc),
) )
delete = [s for s in snapshots if s.dirname not in plan.keep] keep = set(plan.keep)
reasons = dict(plan.reasons)
if protect_bases:
keep, reasons = _apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [s for s in snapshots if s.dirname not in keep]
return { return {
"ok": True, "ok": True,
"host": host, "host": host,
"kind": kind, "kind": kind,
"protect_bases": bool(protect_bases),
"retention": retention, "retention": retention,
"keep": sorted(plan.keep), "keep": sorted(keep),
"delete": [ "delete": [
{ {
"dirname": s.dirname, "dirname": s.dirname,
@@ -99,6 +159,6 @@ def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
} }
for s in delete for s in delete
], ],
"reasons": plan.reasons, "reasons": reasons,
} }