feat(snapshots): add list/show commands and snapshot_meta helpers
This commit is contained in:
@@ -9,9 +9,11 @@ from .commands.doctor import run_doctor
|
||||
from .commands.init_host import run_init_host
|
||||
from .commands.install import run_install
|
||||
from .commands.list_remotes import run_list_remotes
|
||||
from .commands.show_config import run_show_config, dump_yaml
|
||||
from .commands.run_scheduled import run_scheduled
|
||||
from .errors import ConfigError, DoctorError, InstallError, PobsyncError, LockError
|
||||
from .commands.show_config import dump_yaml, run_show_config
|
||||
from .commands.snapshots_list import run_snapshots_list
|
||||
from .commands.snapshots_show import run_snapshots_show
|
||||
from .errors import ConfigError, DoctorError, InstallError, LockError, PobsyncError
|
||||
from .paths import PobsyncPaths
|
||||
from .util import is_tty, to_json_safe
|
||||
|
||||
@@ -67,6 +69,26 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
rp.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run without creating directories")
|
||||
rp.set_defaults(_handler=cmd_run_scheduled)
|
||||
|
||||
# snapshots
|
||||
sn = sub.add_parser("snapshots", help="Inspect snapshots (list/show)")
|
||||
sn_sub = sn.add_subparsers(dest="snapshots_cmd", required=True)
|
||||
|
||||
sn_list = sn_sub.add_parser("list", help="List snapshots for a host")
|
||||
sn_list.add_argument("host", help="Host name")
|
||||
sn_list.add_argument("--kind", default="all", help="scheduled|manual|incomplete|all (default: all)")
|
||||
sn_list.add_argument("--limit", type=int, default=20, help="Max results (default: 20)")
|
||||
sn_list.add_argument(
|
||||
"--include-incomplete",
|
||||
action="store_true",
|
||||
help="Include .incomplete when --kind=all (default: false)",
|
||||
)
|
||||
sn_list.set_defaults(_handler=cmd_snapshots_list)
|
||||
|
||||
sn_show = sn_sub.add_parser("show", help="Show snapshot metadata")
|
||||
sn_show.add_argument("host", help="Host name")
|
||||
sn_show.add_argument("--kind", required=True, help="scheduled|manual|incomplete")
|
||||
sn_show.add_argument("dirname", help="Snapshot directory name (e.g. 20260202-223807Z__K3VQEVH7)")
|
||||
sn_show.set_defaults(_handler=cmd_snapshots_show)
|
||||
|
||||
return p
|
||||
|
||||
@@ -147,6 +169,19 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
|
||||
if "base" in result and result["base"]:
|
||||
print(f"- base {result['base']}")
|
||||
|
||||
# snapshots list
|
||||
if "snapshots" in result:
|
||||
for s in result["snapshots"]:
|
||||
kind = s.get("kind", "?")
|
||||
dirname = s.get("dirname", "?")
|
||||
status = s.get("status") or "unknown"
|
||||
started_at = s.get("started_at") or ""
|
||||
dur = s.get("duration_seconds")
|
||||
dur_s = f"{dur}s" if isinstance(dur, int) else ""
|
||||
extra = " ".join(x for x in [started_at, dur_s] if x)
|
||||
if extra:
|
||||
extra = " " + extra
|
||||
print(f"- {kind} {dirname} {status}{extra}")
|
||||
|
||||
|
||||
def cmd_install(args: argparse.Namespace) -> int:
|
||||
@@ -181,6 +216,7 @@ def cmd_init_host(args: argparse.Namespace) -> int:
|
||||
# In phase 1 we require retention explicitly or via install default.
|
||||
# We'll read global.yaml if present to fetch retention_defaults.
|
||||
from .config.load import load_global_config
|
||||
|
||||
paths = PobsyncPaths(home=prefix)
|
||||
global_cfg = load_global_config(paths.global_config_path)
|
||||
retention = global_cfg.get("retention_defaults") or {"daily": 14, "weekly": 8, "monthly": 12, "yearly": 0}
|
||||
@@ -239,6 +275,33 @@ def cmd_run_scheduled(args: argparse.Namespace) -> int:
|
||||
return 0 if result.get("ok") else 2
|
||||
|
||||
|
||||
def cmd_snapshots_list(args: argparse.Namespace) -> int:
|
||||
prefix = Path(args.prefix)
|
||||
result = run_snapshots_list(
|
||||
prefix=prefix,
|
||||
host=args.host,
|
||||
kind=args.kind,
|
||||
limit=int(args.limit),
|
||||
include_incomplete=bool(args.include_incomplete),
|
||||
)
|
||||
_print(result, as_json=bool(args.json))
|
||||
return 0 if result.get("ok") else 1
|
||||
|
||||
|
||||
def cmd_snapshots_show(args: argparse.Namespace) -> int:
|
||||
prefix = Path(args.prefix)
|
||||
result = run_snapshots_show(prefix=prefix, host=args.host, kind=args.kind, dirname=args.dirname)
|
||||
|
||||
if args.json:
|
||||
_print(result, as_json=True)
|
||||
else:
|
||||
# Similar to show-config: dump YAML for the meta block.
|
||||
print(dump_yaml(result.get("meta", {})).rstrip())
|
||||
if result.get("log_path"):
|
||||
print(f"\n# rsync.log: {result['log_path']}")
|
||||
|
||||
return 0 if result.get("ok") else 1
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = build_parser()
|
||||
@@ -247,6 +310,7 @@ def main(argv: list[str] | None = None) -> int:
|
||||
try:
|
||||
handler = getattr(args, "_handler")
|
||||
return int(handler(args))
|
||||
|
||||
except PobsyncError as e:
|
||||
if args.json:
|
||||
_print({"ok": False, "error": str(e), "type": type(e).__name__}, as_json=True)
|
||||
|
||||
77
src/pobsync/commands/snapshots_list.py
Normal file
77
src/pobsync/commands/snapshots_list.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ..config.load import load_global_config, load_host_config
|
||||
from ..config.merge import build_effective_config
|
||||
from ..errors import ConfigError
|
||||
from ..paths import PobsyncPaths
|
||||
from ..snapshot_meta import iter_snapshot_dirs, normalize_kind, read_snapshot_meta, resolve_host_root
|
||||
from ..util import sanitize_host
|
||||
|
||||
|
||||
def run_snapshots_list(prefix: Path, host: str, kind: str, limit: int, include_incomplete: bool) -> dict[str, Any]:
|
||||
host = sanitize_host(host)
|
||||
k = normalize_kind(kind)
|
||||
|
||||
if limit < 1:
|
||||
raise ConfigError("--limit must be >= 1")
|
||||
|
||||
paths = PobsyncPaths(home=prefix)
|
||||
|
||||
global_cfg = load_global_config(paths.global_config_path)
|
||||
host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml")
|
||||
cfg = build_effective_config(global_cfg, host_cfg)
|
||||
|
||||
backup_root = cfg.get("backup_root")
|
||||
if not isinstance(backup_root, str) or not backup_root.startswith("/"):
|
||||
raise ConfigError("Invalid backup_root in effective config")
|
||||
|
||||
host_root = resolve_host_root(backup_root, host)
|
||||
|
||||
kinds: list[str]
|
||||
if k == "all":
|
||||
kinds = ["scheduled", "manual"]
|
||||
if include_incomplete:
|
||||
kinds.append("incomplete")
|
||||
else:
|
||||
kinds = [k]
|
||||
|
||||
out: list[dict[str, Any]] = []
|
||||
remaining = limit
|
||||
|
||||
for kk in kinds:
|
||||
if remaining <= 0:
|
||||
break
|
||||
|
||||
for d in iter_snapshot_dirs(host_root, kk):
|
||||
if remaining <= 0:
|
||||
break
|
||||
|
||||
meta = read_snapshot_meta(d)
|
||||
|
||||
out.append(
|
||||
{
|
||||
"kind": kk,
|
||||
"dirname": d.name,
|
||||
"path": str(d),
|
||||
"status": meta.get("status"),
|
||||
"started_at": meta.get("started_at"),
|
||||
"ended_at": meta.get("ended_at"),
|
||||
"duration_seconds": meta.get("duration_seconds"),
|
||||
"base": meta.get("base"),
|
||||
"id": meta.get("id"),
|
||||
}
|
||||
)
|
||||
remaining -= 1
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"host": host,
|
||||
"kind": k,
|
||||
"include_incomplete": bool(include_incomplete),
|
||||
"limit": limit,
|
||||
"snapshots": out,
|
||||
}
|
||||
|
||||
49
src/pobsync/commands/snapshots_show.py
Normal file
49
src/pobsync/commands/snapshots_show.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ..config.load import load_global_config, load_host_config
|
||||
from ..config.merge import build_effective_config
|
||||
from ..errors import ConfigError
|
||||
from ..paths import PobsyncPaths
|
||||
from ..snapshot_meta import build_snapshot_ref, normalize_kind, read_snapshot_meta, resolve_host_root, snapshot_log_path
|
||||
from ..util import sanitize_host
|
||||
|
||||
|
||||
def run_snapshots_show(prefix: Path, host: str, kind: str, dirname: str) -> dict[str, Any]:
|
||||
host = sanitize_host(host)
|
||||
k = normalize_kind(kind)
|
||||
if k == "all":
|
||||
raise ConfigError("kind must be scheduled, manual, or incomplete for show")
|
||||
|
||||
paths = PobsyncPaths(home=prefix)
|
||||
|
||||
global_cfg = load_global_config(paths.global_config_path)
|
||||
host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml")
|
||||
cfg = build_effective_config(global_cfg, host_cfg)
|
||||
|
||||
backup_root = cfg.get("backup_root")
|
||||
if not isinstance(backup_root, str) or not backup_root.startswith("/"):
|
||||
raise ConfigError("Invalid backup_root in effective config")
|
||||
|
||||
host_root = resolve_host_root(backup_root, host)
|
||||
|
||||
ref = build_snapshot_ref(host=host, host_root=host_root, kind=k, dirname=dirname)
|
||||
if not ref.path.exists():
|
||||
raise ConfigError(f"Snapshot not found: {k}/{dirname}")
|
||||
|
||||
meta = read_snapshot_meta(ref.path)
|
||||
log = snapshot_log_path(ref.path)
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"host": host,
|
||||
"kind": k,
|
||||
"dirname": dirname,
|
||||
"path": str(ref.path),
|
||||
"meta_path": str(ref.path / "meta" / "meta.yaml"),
|
||||
"log_path": str(log) if log.exists() else None,
|
||||
"meta": meta,
|
||||
}
|
||||
|
||||
98
src/pobsync/snapshot_meta.py
Normal file
98
src/pobsync/snapshot_meta.py
Normal file
@@ -0,0 +1,98 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable
|
||||
|
||||
from .errors import ConfigError
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SnapshotRef:
|
||||
host: str
|
||||
kind: str # scheduled | manual | incomplete
|
||||
dirname: str
|
||||
path: Path
|
||||
|
||||
|
||||
def snapshot_meta_path(snapshot_dir: Path) -> Path:
|
||||
return snapshot_dir / "meta" / "meta.yaml"
|
||||
|
||||
|
||||
def snapshot_log_path(snapshot_dir: Path) -> Path:
|
||||
return snapshot_dir / "meta" / "rsync.log"
|
||||
|
||||
|
||||
def read_snapshot_meta(snapshot_dir: Path) -> dict[str, Any]:
|
||||
"""
|
||||
Read meta/meta.yaml for a snapshot directory.
|
||||
Returns {} if missing/unreadable YAML; callers can decide how strict to be.
|
||||
"""
|
||||
import yaml # type: ignore[import-not-found]
|
||||
|
||||
p = snapshot_meta_path(snapshot_dir)
|
||||
if not p.exists():
|
||||
return {}
|
||||
|
||||
try:
|
||||
data = yaml.safe_load(p.read_text(encoding="utf-8"))
|
||||
if data is None:
|
||||
return {}
|
||||
if not isinstance(data, dict):
|
||||
return {}
|
||||
return data
|
||||
except OSError:
|
||||
return {}
|
||||
except Exception:
|
||||
# YAML parse errors should not crash listing; return empty meta.
|
||||
return {}
|
||||
|
||||
|
||||
def iter_snapshot_dirs(host_root: Path, kind: str) -> Iterable[Path]:
|
||||
"""
|
||||
Yield snapshot directories for a given host root and kind.
|
||||
kind: scheduled|manual|incomplete
|
||||
"""
|
||||
if kind == "scheduled":
|
||||
parent = host_root / "scheduled"
|
||||
elif kind == "manual":
|
||||
parent = host_root / "manual"
|
||||
elif kind == "incomplete":
|
||||
parent = host_root / ".incomplete"
|
||||
else:
|
||||
raise ValueError(f"Invalid kind: {kind!r}")
|
||||
|
||||
if not parent.exists():
|
||||
return []
|
||||
|
||||
# Snapshot dirs are named <ts>__<id> and sorted lexicographically == chronological
|
||||
dirs = [p for p in parent.iterdir() if p.is_dir()]
|
||||
dirs.sort(reverse=True)
|
||||
return dirs
|
||||
|
||||
|
||||
def build_snapshot_ref(host: str, host_root: Path, kind: str, dirname: str) -> SnapshotRef:
|
||||
if kind == "scheduled":
|
||||
p = host_root / "scheduled" / dirname
|
||||
elif kind == "manual":
|
||||
p = host_root / "manual" / dirname
|
||||
elif kind == "incomplete":
|
||||
p = host_root / ".incomplete" / dirname
|
||||
else:
|
||||
raise ValueError(f"Invalid kind: {kind!r}")
|
||||
|
||||
return SnapshotRef(host=host, kind=kind, dirname=dirname, path=p)
|
||||
|
||||
|
||||
def resolve_host_root(backup_root: str, host: str) -> Path:
|
||||
if not backup_root.startswith("/"):
|
||||
raise ConfigError("backup_root must be an absolute path")
|
||||
return Path(backup_root) / host
|
||||
|
||||
|
||||
def normalize_kind(kind: str) -> str:
|
||||
k = kind.strip().lower()
|
||||
if k in {"scheduled", "manual", "incomplete", "all"}:
|
||||
return k
|
||||
raise ConfigError("kind must be one of: scheduled, manual, incomplete, all")
|
||||
|
||||
Reference in New Issue
Block a user