feat(retention): show keep/delete details in human output

This commit is contained in:
2026-02-03 12:28:09 +01:00
parent 7a7c4ccaee
commit 1e5807790d
3 changed files with 288 additions and 3 deletions

View File

@@ -9,11 +9,12 @@ from .commands.doctor import run_doctor
from .commands.init_host import run_init_host
from .commands.install import run_install
from .commands.list_remotes import run_list_remotes
from .commands.retention_plan import run_retention_plan
from .commands.run_scheduled import run_scheduled
from .commands.show_config import dump_yaml, run_show_config
from .commands.snapshots_list import run_snapshots_list
from .commands.snapshots_show import run_snapshots_show
from .errors import ConfigError, DoctorError, InstallError, LockError, PobsyncError
from .errors import ConfigError, LockError, PobsyncError
from .paths import PobsyncPaths
from .util import is_tty, to_json_safe
@@ -77,16 +78,34 @@ def build_parser() -> argparse.ArgumentParser:
sn_list.add_argument("host", help="Host name")
sn_list.add_argument("--kind", default="all", help="scheduled|manual|incomplete|all (default: all)")
sn_list.add_argument("--limit", type=int, default=20, help="Max results (default: 20)")
sn_list.add_argument("--include-incomplete", action="store_true", help="Include .incomplete when --kind=all (default: false)")
sn_list.add_argument(
"--include-incomplete",
action="store_true",
help="Include .incomplete when --kind=all (default: false)",
)
sn_list.set_defaults(_handler=cmd_snapshots_list)
sn_show = sn_sub.add_parser("show", help="Show snapshot metadata")
sn_show.add_argument("host", help="Host name")
sn_show.add_argument("--kind", required=True, help="scheduled|manual|incomplete")
sn_show.add_argument("dirname", help="Snapshot directory name (e.g. 20260202-223807Z__K3VQEVH7)")
sn_show.add_argument("--tail", type=int, default=None, help="Show last N lines of rsync.log")
sn_show.add_argument(
"--tail",
type=int,
default=None,
help="Show last N lines of rsync.log",
)
sn_show.set_defaults(_handler=cmd_snapshots_show)
# retention
rt = sub.add_parser("retention", help="Retention management (dry-run)")
rt_sub = rt.add_subparsers(dest="retention_cmd", required=True)
rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)")
rt_plan.add_argument("host", help="Host name")
rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_plan.set_defaults(_handler=cmd_retention_plan)
return p
@@ -170,6 +189,36 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
extra = " " + extra
print(f"- {kind} {dirname} {status}{extra}")
# retention plan
if "keep" in result and "delete" in result:
keep = result.get("keep") or []
delete = result.get("delete") or []
reasons = result.get("reasons") or {}
total = len(keep) + len(delete)
print(f"- total {total}")
print(f"- keep {len(keep)}")
print(f"- delete {len(delete)}")
if keep:
print("- keep:")
for d in keep:
rs = reasons.get(d) or []
rs_s = f" ({', '.join(rs)})" if rs else ""
print(f" - {d}{rs_s}")
if delete:
print("- delete:")
for item in delete:
dirname = item.get("dirname", "?")
dt = item.get("dt") or ""
status = item.get("status") or "unknown"
kind = item.get("kind", "?")
extra = " ".join(x for x in [kind, status, dt] if x)
if extra:
extra = " " + extra
print(f" - {dirname}{extra}")
def cmd_install(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
@@ -308,6 +357,13 @@ def cmd_snapshots_show(args: argparse.Namespace) -> int:
return 0 if result.get("ok") else 1
def cmd_retention_plan(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List
from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config
from ..errors import ConfigError
from ..paths import PobsyncPaths
from ..retention import Snapshot, build_retention_plan
from ..snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root
from ..util import sanitize_host
def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime:
ts = meta.get("started_at")
if isinstance(ts, str) and ts.endswith("Z"):
try:
return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
except ValueError:
pass
# fallback: dirname YYYYMMDD-HHMMSSZ__ID
try:
prefix = dirname.split("__", 1)[0]
return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
except Exception:
return datetime.fromtimestamp(0, tz=timezone.utc)
def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
paths = PobsyncPaths(home=prefix)
global_cfg = load_global_config(paths.global_config_path)
host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml")
cfg = build_effective_config(global_cfg, host_cfg)
retention = cfg.get("retention")
if not isinstance(retention, dict):
raise ConfigError("No retention config found")
backup_root = cfg.get("backup_root")
if not isinstance(backup_root, str) or not backup_root.startswith("/"):
raise ConfigError("Invalid backup_root in config")
host_root = resolve_host_root(backup_root, host)
kinds: List[str]
if kind == "all":
kinds = ["scheduled", "manual"]
else:
kinds = [kind]
snapshots: List[Snapshot] = []
for kk in kinds:
for d in iter_snapshot_dirs(host_root, kk):
meta = read_snapshot_meta(d)
dt = _parse_snapshot_dt(d.name, meta)
snapshots.append(
Snapshot(
kind=kk,
dirname=d.name,
path=str(d),
dt=dt,
status=meta.get("status"),
base=meta.get("base"),
)
)
plan = build_retention_plan(
snapshots=snapshots,
retention=retention,
now=datetime.now(timezone.utc),
)
delete = [s for s in snapshots if s.dirname not in plan.keep]
return {
"ok": True,
"host": host,
"kind": kind,
"retention": retention,
"keep": sorted(plan.keep),
"delete": [
{
"dirname": s.dirname,
"kind": s.kind,
"path": s.path,
"dt": s.dt.isoformat(),
"status": s.status,
}
for s in delete
],
"reasons": plan.reasons,
}

125
src/pobsync/retention.py Normal file
View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
@dataclass(frozen=True)
class Snapshot:
kind: str # scheduled | manual
dirname: str
path: str
dt: datetime # UTC
status: Optional[str]
base: Optional[dict]
@dataclass
class RetentionResult:
keep: Set[str] # dirnames
reasons: Dict[str, List[str]]
def _bucket_day(dt: datetime) -> str:
return dt.strftime("%Y-%m-%d")
def _bucket_week(dt: datetime) -> str:
iso = dt.isocalendar()
return f"{iso.year}-W{iso.week:02d}"
def _bucket_month(dt: datetime) -> str:
return dt.strftime("%Y-%m")
def _bucket_year(dt: datetime) -> str:
return dt.strftime("%Y")
def _window_start(now: datetime, unit: str, count: int) -> datetime:
if count <= 0:
return now + timedelta(days=1)
if unit == "daily":
return (now - timedelta(days=count - 1)).replace(hour=0, minute=0, second=0, microsecond=0)
if unit == "weekly":
return now - timedelta(weeks=count - 1)
if unit == "monthly":
return now.replace(day=1) - timedelta(days=32 * (count - 1))
if unit == "yearly":
return now.replace(month=1, day=1) - timedelta(days=366 * (count - 1))
raise ValueError(unit)
def build_retention_plan(
snapshots: Iterable[Snapshot],
retention: Dict[str, int],
now: Optional[datetime] = None,
) -> RetentionResult:
"""
Build a dry-run retention plan.
Returns:
- keep: set of snapshot dirnames to keep
- reasons: mapping dirname -> list of reasons why it is kept
"""
if now is None:
now = datetime.now(timezone.utc)
snaps = sorted(snapshots, key=lambda s: s.dt, reverse=True)
keep: Set[str] = set()
reasons: Dict[str, List[str]] = {}
def mark(dirname: str, reason: str) -> None:
keep.add(dirname)
reasons.setdefault(dirname, []).append(reason)
# Always keep newest snapshot overall (if any)
if snaps:
mark(snaps[0].dirname, "newest")
# Retention buckets
rules = [
("daily", retention.get("daily", 0), _bucket_day),
("weekly", retention.get("weekly", 0), _bucket_week),
("monthly", retention.get("monthly", 0), _bucket_month),
("yearly", retention.get("yearly", 0), _bucket_year),
]
for name, count, bucket_fn in rules:
if count <= 0:
continue
window_start = _window_start(now, name, count)
seen: Set[str] = set()
for s in snaps:
if s.dt < window_start:
break
bucket = bucket_fn(s.dt)
if bucket in seen:
continue
# Prefer successful snapshots, but allow fallback
if s.status not in (None, "success"):
continue
seen.add(bucket)
mark(s.dirname, f"{name}:{bucket}")
# Fallback: if a bucket had no success, allow newest non-success
for s in snaps:
if s.dt < window_start:
break
bucket = bucket_fn(s.dt)
if bucket in seen:
continue
seen.add(bucket)
mark(s.dirname, f"{name}:{bucket}:fallback")
return RetentionResult(keep=keep, reasons=reasons)