diff --git a/src/pobsync/cli.py b/src/pobsync/cli.py index 8b48939..294f3dc 100644 --- a/src/pobsync/cli.py +++ b/src/pobsync/cli.py @@ -9,11 +9,12 @@ from .commands.doctor import run_doctor from .commands.init_host import run_init_host from .commands.install import run_install from .commands.list_remotes import run_list_remotes +from .commands.retention_plan import run_retention_plan from .commands.run_scheduled import run_scheduled from .commands.show_config import dump_yaml, run_show_config from .commands.snapshots_list import run_snapshots_list from .commands.snapshots_show import run_snapshots_show -from .errors import ConfigError, DoctorError, InstallError, LockError, PobsyncError +from .errors import ConfigError, LockError, PobsyncError from .paths import PobsyncPaths from .util import is_tty, to_json_safe @@ -77,16 +78,34 @@ def build_parser() -> argparse.ArgumentParser: sn_list.add_argument("host", help="Host name") sn_list.add_argument("--kind", default="all", help="scheduled|manual|incomplete|all (default: all)") sn_list.add_argument("--limit", type=int, default=20, help="Max results (default: 20)") - sn_list.add_argument("--include-incomplete", action="store_true", help="Include .incomplete when --kind=all (default: false)") + sn_list.add_argument( + "--include-incomplete", + action="store_true", + help="Include .incomplete when --kind=all (default: false)", + ) sn_list.set_defaults(_handler=cmd_snapshots_list) sn_show = sn_sub.add_parser("show", help="Show snapshot metadata") sn_show.add_argument("host", help="Host name") sn_show.add_argument("--kind", required=True, help="scheduled|manual|incomplete") sn_show.add_argument("dirname", help="Snapshot directory name (e.g. 20260202-223807Z__K3VQEVH7)") - sn_show.add_argument("--tail", type=int, default=None, help="Show last N lines of rsync.log") + sn_show.add_argument( + "--tail", + type=int, + default=None, + help="Show last N lines of rsync.log", + ) sn_show.set_defaults(_handler=cmd_snapshots_show) + # retention + rt = sub.add_parser("retention", help="Retention management (dry-run)") + rt_sub = rt.add_subparsers(dest="retention_cmd", required=True) + + rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)") + rt_plan.add_argument("host", help="Host name") + rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)") + rt_plan.set_defaults(_handler=cmd_retention_plan) + return p @@ -170,6 +189,36 @@ def _print(result: dict[str, Any], as_json: bool) -> None: extra = " " + extra print(f"- {kind} {dirname} {status}{extra}") + # retention plan + if "keep" in result and "delete" in result: + keep = result.get("keep") or [] + delete = result.get("delete") or [] + reasons = result.get("reasons") or {} + + total = len(keep) + len(delete) + print(f"- total {total}") + print(f"- keep {len(keep)}") + print(f"- delete {len(delete)}") + + if keep: + print("- keep:") + for d in keep: + rs = reasons.get(d) or [] + rs_s = f" ({', '.join(rs)})" if rs else "" + print(f" - {d}{rs_s}") + + if delete: + print("- delete:") + for item in delete: + dirname = item.get("dirname", "?") + dt = item.get("dt") or "" + status = item.get("status") or "unknown" + kind = item.get("kind", "?") + extra = " ".join(x for x in [kind, status, dt] if x) + if extra: + extra = " " + extra + print(f" - {dirname}{extra}") + def cmd_install(args: argparse.Namespace) -> int: prefix = Path(args.prefix) @@ -308,6 +357,13 @@ def cmd_snapshots_show(args: argparse.Namespace) -> int: return 0 if result.get("ok") else 1 +def cmd_retention_plan(args: argparse.Namespace) -> int: + prefix = Path(args.prefix) + result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind) + _print(result, as_json=bool(args.json)) + return 0 if result.get("ok") else 1 + + def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) diff --git a/src/pobsync/commands/retention_plan.py b/src/pobsync/commands/retention_plan.py new file mode 100644 index 0000000..ef6902e --- /dev/null +++ b/src/pobsync/commands/retention_plan.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List + +from ..config.load import load_global_config, load_host_config +from ..config.merge import build_effective_config +from ..errors import ConfigError +from ..paths import PobsyncPaths +from ..retention import Snapshot, build_retention_plan +from ..snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root +from ..util import sanitize_host + + +def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime: + ts = meta.get("started_at") + if isinstance(ts, str) and ts.endswith("Z"): + try: + return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + except ValueError: + pass + + # fallback: dirname YYYYMMDD-HHMMSSZ__ID + try: + prefix = dirname.split("__", 1)[0] + return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc) + except Exception: + return datetime.fromtimestamp(0, tz=timezone.utc) + + +def run_retention_plan(prefix: Path, host: str, kind: str) -> dict[str, Any]: + host = sanitize_host(host) + + if kind not in {"scheduled", "manual", "all"}: + raise ConfigError("kind must be scheduled, manual, or all") + + paths = PobsyncPaths(home=prefix) + + global_cfg = load_global_config(paths.global_config_path) + host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml") + cfg = build_effective_config(global_cfg, host_cfg) + + retention = cfg.get("retention") + if not isinstance(retention, dict): + raise ConfigError("No retention config found") + + backup_root = cfg.get("backup_root") + if not isinstance(backup_root, str) or not backup_root.startswith("/"): + raise ConfigError("Invalid backup_root in config") + + host_root = resolve_host_root(backup_root, host) + + kinds: List[str] + if kind == "all": + kinds = ["scheduled", "manual"] + else: + kinds = [kind] + + snapshots: List[Snapshot] = [] + + for kk in kinds: + for d in iter_snapshot_dirs(host_root, kk): + meta = read_snapshot_meta(d) + dt = _parse_snapshot_dt(d.name, meta) + + snapshots.append( + Snapshot( + kind=kk, + dirname=d.name, + path=str(d), + dt=dt, + status=meta.get("status"), + base=meta.get("base"), + ) + ) + + plan = build_retention_plan( + snapshots=snapshots, + retention=retention, + now=datetime.now(timezone.utc), + ) + + delete = [s for s in snapshots if s.dirname not in plan.keep] + + return { + "ok": True, + "host": host, + "kind": kind, + "retention": retention, + "keep": sorted(plan.keep), + "delete": [ + { + "dirname": s.dirname, + "kind": s.kind, + "path": s.path, + "dt": s.dt.isoformat(), + "status": s.status, + } + for s in delete + ], + "reasons": plan.reasons, + } + diff --git a/src/pobsync/retention.py b/src/pobsync/retention.py new file mode 100644 index 0000000..cd9a22a --- /dev/null +++ b/src/pobsync/retention.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple + + +@dataclass(frozen=True) +class Snapshot: + kind: str # scheduled | manual + dirname: str + path: str + dt: datetime # UTC + status: Optional[str] + base: Optional[dict] + + +@dataclass +class RetentionResult: + keep: Set[str] # dirnames + reasons: Dict[str, List[str]] + + +def _bucket_day(dt: datetime) -> str: + return dt.strftime("%Y-%m-%d") + + +def _bucket_week(dt: datetime) -> str: + iso = dt.isocalendar() + return f"{iso.year}-W{iso.week:02d}" + + +def _bucket_month(dt: datetime) -> str: + return dt.strftime("%Y-%m") + + +def _bucket_year(dt: datetime) -> str: + return dt.strftime("%Y") + + +def _window_start(now: datetime, unit: str, count: int) -> datetime: + if count <= 0: + return now + timedelta(days=1) + + if unit == "daily": + return (now - timedelta(days=count - 1)).replace(hour=0, minute=0, second=0, microsecond=0) + if unit == "weekly": + return now - timedelta(weeks=count - 1) + if unit == "monthly": + return now.replace(day=1) - timedelta(days=32 * (count - 1)) + if unit == "yearly": + return now.replace(month=1, day=1) - timedelta(days=366 * (count - 1)) + + raise ValueError(unit) + + +def build_retention_plan( + snapshots: Iterable[Snapshot], + retention: Dict[str, int], + now: Optional[datetime] = None, +) -> RetentionResult: + """ + Build a dry-run retention plan. + Returns: + - keep: set of snapshot dirnames to keep + - reasons: mapping dirname -> list of reasons why it is kept + """ + if now is None: + now = datetime.now(timezone.utc) + + snaps = sorted(snapshots, key=lambda s: s.dt, reverse=True) + + keep: Set[str] = set() + reasons: Dict[str, List[str]] = {} + + def mark(dirname: str, reason: str) -> None: + keep.add(dirname) + reasons.setdefault(dirname, []).append(reason) + + # Always keep newest snapshot overall (if any) + if snaps: + mark(snaps[0].dirname, "newest") + + # Retention buckets + rules = [ + ("daily", retention.get("daily", 0), _bucket_day), + ("weekly", retention.get("weekly", 0), _bucket_week), + ("monthly", retention.get("monthly", 0), _bucket_month), + ("yearly", retention.get("yearly", 0), _bucket_year), + ] + + for name, count, bucket_fn in rules: + if count <= 0: + continue + + window_start = _window_start(now, name, count) + seen: Set[str] = set() + + for s in snaps: + if s.dt < window_start: + break + + bucket = bucket_fn(s.dt) + if bucket in seen: + continue + + # Prefer successful snapshots, but allow fallback + if s.status not in (None, "success"): + continue + + seen.add(bucket) + mark(s.dirname, f"{name}:{bucket}") + + # Fallback: if a bucket had no success, allow newest non-success + for s in snaps: + if s.dt < window_start: + break + bucket = bucket_fn(s.dt) + if bucket in seen: + continue + seen.add(bucket) + mark(s.dirname, f"{name}:{bucket}:fallback") + + return RetentionResult(keep=keep, reasons=reasons) +