feat(retention): add destructive apply command with safety guards

This commit is contained in:
2026-02-03 12:50:35 +01:00
parent fdd292b1b6
commit 93acc58770
2 changed files with 133 additions and 1 deletions

View File

@@ -9,6 +9,7 @@ from .commands.doctor import run_doctor
from .commands.init_host import run_init_host
from .commands.install import run_install
from .commands.list_remotes import run_list_remotes
from .commands.retention_apply import run_retention_apply
from .commands.retention_plan import run_retention_plan
from .commands.run_scheduled import run_scheduled
from .commands.show_config import dump_yaml, run_show_config
@@ -98,7 +99,7 @@ def build_parser() -> argparse.ArgumentParser:
sn_show.set_defaults(_handler=cmd_snapshots_show)
# retention
rt = sub.add_parser("retention", help="Retention management (dry-run)")
rt = sub.add_parser("retention", help="Retention management")
rt_sub = rt.add_subparsers(dest="retention_cmd", required=True)
rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)")
@@ -111,6 +112,27 @@ def build_parser() -> argparse.ArgumentParser:
)
rt_plan.set_defaults(_handler=cmd_retention_plan)
rt_apply = rt_sub.add_parser("apply", help="Apply retention plan (DESTRUCTIVE)")
rt_apply.add_argument("host", help="Host name")
rt_apply.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_apply.add_argument(
"--protect-bases",
action="store_true",
help="Also keep base snapshots referenced in meta (default: false)",
)
rt_apply.add_argument(
"--max-delete",
type=int,
default=10,
help="Refuse to delete more than N snapshots (default: 10; set 0 to block)",
)
rt_apply.add_argument(
"--yes",
action="store_true",
help="Confirm deletion",
)
rt_apply.set_defaults(_handler=cmd_retention_apply)
return p
@@ -372,6 +394,20 @@ def cmd_retention_plan(args: argparse.Namespace) -> int:
return 0 if result.get("ok") else 1
def cmd_retention_apply(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_apply(
prefix=prefix,
host=args.host,
kind=args.kind,
protect_bases=bool(args.protect_bases),
yes=bool(args.yes),
max_delete=int(args.max_delete),
)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import Any, Dict, List
from ..errors import ConfigError
from ..lock import acquire_host_lock
from ..paths import PobsyncPaths
from ..util import sanitize_host
from .retention_plan import run_retention_plan
def run_retention_apply(
prefix: Path,
host: str,
kind: str,
protect_bases: bool,
yes: bool,
max_delete: int,
) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
if not yes:
raise ConfigError("Refusing to delete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
plan = run_retention_plan(prefix=prefix, host=host, kind=kind, protect_bases=bool(protect_bases))
delete_list = plan.get("delete") or []
if not isinstance(delete_list, list):
raise ConfigError("Invalid retention plan output: delete is not a list")
if max_delete == 0 and len(delete_list) > 0:
raise ConfigError("Deletion blocked by --max-delete=0")
if len(delete_list) > max_delete:
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
actions: List[str] = []
deleted: List[Dict[str, Any]] = []
for item in delete_list:
if not isinstance(item, dict):
continue
dirname = item.get("dirname")
snap_kind = item.get("kind")
snap_path = item.get("path")
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
continue
# Hard safety: only allow scheduled/manual deletions from plan
if snap_kind not in {"scheduled", "manual"}:
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
p = Path(snap_path)
if not p.exists():
actions.append(f"skip missing {snap_kind}/{dirname}")
continue
if not p.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
# Destructive action
shutil.rmtree(p)
actions.append(f"deleted {snap_kind} {dirname}")
deleted.append(
{
"dirname": dirname,
"kind": snap_kind,
"path": snap_path,
}
)
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"max_delete": max_delete,
"deleted": deleted,
"actions": actions,
}