add a few extra commands

This commit is contained in:
2026-02-03 11:11:37 +01:00
parent ce7eba9119
commit f67abdcc91
12 changed files with 711 additions and 17 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,3 @@
__pycache__
*egg-info
*egg-info
build/

View File

@@ -8,7 +8,10 @@ from typing import Any
from .commands.doctor import run_doctor
from .commands.init_host import run_init_host
from .commands.install import run_install
from .errors import ConfigError, DoctorError, InstallError, PobsyncError
from .commands.list_remotes import run_list_remotes
from .commands.show_config import run_show_config, dump_yaml
from .commands.run_scheduled import run_scheduled
from .errors import ConfigError, DoctorError, InstallError, PobsyncError, LockError
from .paths import PobsyncPaths
from .util import is_tty, to_json_safe
@@ -48,6 +51,23 @@ def build_parser() -> argparse.ArgumentParser:
dp.add_argument("--rsync-dry-run", action="store_true", help="Try rsync dry run (phase 2)")
dp.set_defaults(_handler=cmd_doctor)
# list remotes
lp = sub.add_parser("list-remotes", help="List configured remotes (host configs)")
lp.set_defaults(_handler=cmd_list_remotes)
# show config
sp = sub.add_parser("show-config", help="Show host configuration (raw or effective)")
sp.add_argument("host", help="Host to show")
sp.add_argument("--effective", action="store_true", help="Show merged effective config")
sp.set_defaults(_handler=cmd_show_config)
# run scheduled
rp = sub.add_parser("run-scheduled", help="Run a scheduled backup for a host")
rp.add_argument("host", help="Host to back up")
rp.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run without creating directories")
rp.set_defaults(_handler=cmd_run_scheduled)
return p
@@ -79,20 +99,30 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
if as_json:
print(json.dumps(to_json_safe(result), indent=2, sort_keys=False))
return
# Minimal human output for phase 1
# Minimal human output
if result.get("ok") is True:
print("OK")
else:
print("FAILED")
# Standard action list
if "actions" in result:
for a in result["actions"]:
print(f"- {a}")
# Single action (e.g. init-host)
if "action" in result:
print(f"- {result['action']}")
# Doctor-style results list
if "results" in result:
for r in result["results"]:
ok = r.get("ok", False)
label = "OK" if ok else "FAIL"
name = r.get("check", "check")
msg = r.get("message") or r.get("error") or ""
extra = ""
if "path" in r:
extra = f" ({r['path']})"
@@ -100,7 +130,23 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
extra = f" ({r['name']})"
elif "host" in r:
extra = f" ({r['host']})"
print(f"- {label} {name}{extra} {msg}".rstrip())
line = f"- {label} {name}{extra}"
if msg:
line += f" {msg}"
print(line)
# list-remotes style output
if "hosts" in result:
for h in result["hosts"]:
print(h)
if "snapshot" in result:
print(f"- snapshot {result['snapshot']}")
if "base" in result and result["base"]:
print(f"- base {result['base']}")
def cmd_install(args: argparse.Namespace) -> int:
@@ -160,6 +206,18 @@ def cmd_init_host(args: argparse.Namespace) -> int:
return 0 if result.get("ok") else 1
def cmd_show_config(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_show_config(prefix=prefix, host=args.host, effective=bool(args.effective))
if args.json:
_print(result, as_json=True)
else:
print(dump_yaml(result["config"]).rstrip())
return 0 if result.get("ok") else 1
def cmd_doctor(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_doctor(prefix=prefix, host=args.host, connect=bool(args.connect), rsync_dry_run=bool(args.rsync_dry_run))
@@ -167,6 +225,21 @@ def cmd_doctor(args: argparse.Namespace) -> int:
return 0 if result.get("ok") else 1
def cmd_list_remotes(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_list_remotes(prefix=prefix)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_run_scheduled(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_scheduled(prefix=prefix, host=args.host, dry_run=bool(args.dry_run))
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 2
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
@@ -179,7 +252,10 @@ def main(argv: list[str] | None = None) -> int:
_print({"ok": False, "error": str(e), "type": type(e).__name__}, as_json=True)
else:
print(f"ERROR: {e}")
if isinstance(e, LockError):
return 10
return 1
except KeyboardInterrupt:
if args.json:
_print({"ok": False, "error": "interrupted"}, as_json=True)

View File

@@ -34,7 +34,6 @@ DEFAULT_RSYNC_ARGS = [
"--delete-excluded",
"--partial",
"--partial-dir=.rsync-partial",
"--inplace",
"--one-file-system",
"--relative",
"--human-readable",

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from pathlib import Path
from ..paths import PobsyncPaths
from ..util import sanitize_host
def run_list_remotes(prefix: Path) -> dict:
paths = PobsyncPaths(home=prefix)
hosts: list[str] = []
if paths.hosts_dir.exists():
for p in sorted(paths.hosts_dir.glob("*.yaml")):
host = p.stem
try:
sanitize_host(host)
except Exception:
# Ignore invalid filenames; doctor will catch config issues.
continue
hosts.append(host)
return {"ok": True, "hosts": hosts}

View File

@@ -0,0 +1,241 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config
from ..errors import ConfigError
from ..lock import acquire_host_lock
from ..paths import PobsyncPaths
from ..rsync import build_rsync_command, build_ssh_command, run_rsync
from ..snapshot import (
HostBackupDirs,
extract_ts_and_id_from_dirname,
format_iso_z,
generate_id,
list_snapshot_dirs,
snapshot_dir_name,
utc_now,
)
from ..util import ensure_dir, realpath_startswith, sanitize_host
def _host_backup_dirs(backup_root: str, host: str) -> HostBackupDirs:
return HostBackupDirs(root=Path(backup_root) / host)
def _find_latest_snapshot(parent: Path) -> Path | None:
"""
Best-effort latest snapshot selection by timestamp in dirname.
Meta parsing comes later.
"""
if not parent.exists():
return None
best: tuple[Any, Path] | None = None
for d in list_snapshot_dirs(parent):
ts, _sid = extract_ts_and_id_from_dirname(d.name)
if ts is None:
continue
if best is None or ts > best[0]:
best = (ts, d)
return best[1] if best else None
def select_scheduled_base(dirs: HostBackupDirs) -> Path | None:
"""
Base selection rule:
scheduled -> manual -> none
"""
base = _find_latest_snapshot(dirs.scheduled)
if base is not None:
return base
return _find_latest_snapshot(dirs.manual)
def write_meta(path: Path, data: dict[str, Any]) -> None:
path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8")
def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
host = sanitize_host(host)
paths = PobsyncPaths(home=prefix)
# Load and merge config
global_cfg = load_global_config(paths.global_config_path)
host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml")
cfg = build_effective_config(global_cfg, host_cfg)
backup_root = cfg.get("backup_root")
if not isinstance(backup_root, str) or not backup_root.startswith("/"):
raise ConfigError("Invalid backup_root in effective config")
dirs = _host_backup_dirs(backup_root, host)
# Absolute safety check
if not realpath_startswith(dirs.root, Path(backup_root)):
raise ConfigError("Refusing to operate outside backup_root")
# Base snapshot (absolute path)
base_dir = select_scheduled_base(dirs)
link_dest = str(base_dir) if base_dir else None
ssh_cfg = cfg.get("ssh", {}) or {}
rsync_cfg = cfg.get("rsync", {}) or {}
rsync_binary = rsync_cfg.get("binary", "rsync")
rsync_args = rsync_cfg.get("args_effective") or rsync_cfg.get("args") or []
timeout_seconds = int(rsync_cfg.get("timeout_seconds", 0) or 0)
bwlimit_kbps = int(rsync_cfg.get("bwlimit_kbps", 0) or 0)
source_root = cfg.get("source_root") or cfg.get("defaults", {}).get("source_root", "/")
if not isinstance(source_root, str) or not source_root.startswith("/"):
raise ConfigError("Invalid source_root in effective config")
address = cfg.get("address")
if not isinstance(address, str) or not address:
raise ConfigError("Invalid address in host config")
user = ssh_cfg.get("user") or "root"
source = f"{user}@{address}:{source_root}"
excludes = cfg.get("excludes_effective", []) or []
includes = cfg.get("includes", []) or []
ssh_cmd = build_ssh_command(ssh_cfg)
# ------------------------------------------------------------
# DRY RUN
# ------------------------------------------------------------
if dry_run:
dest = f"/tmp/pobsync-dryrun/{host}/"
dryrun_log = Path(f"/tmp/pobsync-dryrun/{host}/rsync.log")
cmd = build_rsync_command(
rsync_binary=str(rsync_binary),
rsync_args=list(rsync_args),
ssh_cmd=ssh_cmd,
source=source,
dest=dest,
link_dest=link_dest,
dry_run=True,
timeout_seconds=timeout_seconds,
bwlimit_kbps=bwlimit_kbps,
extra_excludes=list(excludes),
extra_includes=list(includes),
)
result = run_rsync(cmd, log_path=dryrun_log, timeout_seconds=timeout_seconds)
return {
"ok": result.exit_code == 0,
"dry_run": True,
"host": host,
"base": str(base_dir) if base_dir else None,
"log": str(dryrun_log),
"rsync": {
"exit_code": result.exit_code,
"command": result.command,
},
}
# ------------------------------------------------------------
# REAL RUN
# ------------------------------------------------------------
with acquire_host_lock(paths.locks_dir, host, command="run-scheduled"):
ts = utc_now()
snap_id = generate_id(8)
snap_name = snapshot_dir_name(ts, snap_id)
ensure_dir(dirs.root)
ensure_dir(dirs.scheduled)
ensure_dir(dirs.manual)
ensure_dir(dirs.incomplete)
incomplete_dir = dirs.incomplete / snap_name
data_dir = incomplete_dir / "data"
meta_dir = incomplete_dir / "meta"
ensure_dir(data_dir)
ensure_dir(meta_dir)
meta_path = meta_dir / "meta.yaml"
log_path = meta_dir / "rsync.log"
meta: dict[str, Any] = {
"id": snap_id,
"host": host,
"type": "scheduled",
"label": None,
"status": "running",
"started_at": format_iso_z(ts),
"ended_at": None,
"base_snapshot": None,
"rsync": {"exit_code": None, "stats": {}},
"overrides": {"includes": [], "excludes": [], "base": None},
}
log_path.touch(exist_ok=True)
write_meta(meta_path, meta)
dest = str(data_dir) + "/"
cmd = build_rsync_command(
rsync_binary=str(rsync_binary),
rsync_args=list(rsync_args),
ssh_cmd=ssh_cmd,
source=source,
dest=dest,
link_dest=link_dest,
dry_run=False,
timeout_seconds=timeout_seconds,
bwlimit_kbps=bwlimit_kbps,
extra_excludes=list(excludes),
extra_includes=list(includes),
)
result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds)
end_ts = utc_now()
meta["ended_at"] = format_iso_z(end_ts)
meta["rsync"]["exit_code"] = result.exit_code
meta["status"] = "success" if result.exit_code == 0 else "failed"
write_meta(meta_path, meta)
if not log_path.exists():
meta["status"] = "failed"
meta["rsync"]["exit_code"] = 99
write_meta(meta_path, meta)
return {
"ok": False,
"dry_run": False,
"host": host,
"snapshot": str(incomplete_dir),
"error": "rsync.log missing after execution",
}
if result.exit_code != 0:
return {
"ok": False,
"dry_run": False,
"host": host,
"snapshot": str(incomplete_dir),
"status": meta["status"],
"rsync": {"exit_code": result.exit_code},
}
final_dir = dirs.scheduled / snap_name
incomplete_dir.rename(final_dir)
return {
"ok": True,
"dry_run": False,
"host": host,
"snapshot": str(final_dir),
"base": str(base_dir) if base_dir else None,
"rsync": {"exit_code": result.exit_code},
}

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config
from ..paths import PobsyncPaths
from ..util import sanitize_host
def run_show_config(prefix: Path, host: str, effective: bool) -> dict[str, Any]:
host = sanitize_host(host)
paths = PobsyncPaths(home=prefix)
global_cfg = load_global_config(paths.global_config_path)
host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml")
cfg = build_effective_config(global_cfg, host_cfg) if effective else host_cfg
return {
"ok": True,
"host": host,
"effective": effective,
"config": cfg,
}
def dump_yaml(data: Any) -> str:
return yaml.safe_dump(data, sort_keys=False)

View File

@@ -8,6 +8,20 @@ from ..validate import FieldSpec, Schema
HOST_RE = re.compile(r"^[A-Za-z0-9._-]+$")
# ----------------------------
# Shared / reusable schemas
# ----------------------------
RETENTION_SCHEMA = Schema(
fields={
"daily": FieldSpec(int, required=True, min_value=0),
"weekly": FieldSpec(int, required=True, min_value=0),
"monthly": FieldSpec(int, required=True, min_value=0),
"yearly": FieldSpec(int, required=True, min_value=0),
},
allow_unknown=False,
)
SSH_SCHEMA = Schema(
fields={
"user": FieldSpec(str, required=False),
@@ -51,11 +65,21 @@ LOGGING_SCHEMA = Schema(
OUTPUT_SCHEMA = Schema(
fields={
"default_format": FieldSpec(str, required=False, default="human", enum={"human", "json"}),
"default_format": FieldSpec(
str,
required=False,
default="human",
enum={"human", "json"},
),
},
allow_unknown=False,
)
# ----------------------------
# Global config schema
# ----------------------------
GLOBAL_SCHEMA = Schema(
fields={
"backup_root": FieldSpec(str, required=True),
@@ -63,22 +87,28 @@ GLOBAL_SCHEMA = Schema(
"ssh": FieldSpec(dict, required=False, schema=SSH_SCHEMA),
"rsync": FieldSpec(dict, required=False, schema=RSYNC_SCHEMA),
"defaults": FieldSpec(dict, required=False, schema=DEFAULTS_SCHEMA),
"excludes_default": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
"excludes_default": FieldSpec(
list,
required=False,
default=[],
item=FieldSpec(str),
),
"logging": FieldSpec(dict, required=False, schema=LOGGING_SCHEMA),
"output": FieldSpec(dict, required=False, schema=OUTPUT_SCHEMA),
# Used by `init-host` as a convenience default
"retention_defaults": FieldSpec(
dict,
required=False,
schema=RETENTION_SCHEMA,
),
},
allow_unknown=False,
)
RETENTION_SCHEMA = Schema(
fields={
"daily": FieldSpec(int, required=True, min_value=0),
"weekly": FieldSpec(int, required=True, min_value=0),
"monthly": FieldSpec(int, required=True, min_value=0),
"yearly": FieldSpec(int, required=True, min_value=0),
},
allow_unknown=False,
)
# ----------------------------
# Host config schema
# ----------------------------
HOST_RSYNC_SCHEMA = Schema(
fields={

View File

@@ -21,3 +21,7 @@ class InstallError(PobsyncError):
class DoctorError(PobsyncError):
"""Raised when doctor detects fatal issues."""
class LockError(PobsyncError):
"""Raised when a host lock is already held by another process."""

111
src/pobsync/lock.py Normal file
View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from .errors import LockError
from .util import sanitize_host
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _pid_is_alive(pid: int) -> bool:
if pid <= 0:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
# Process exists but we might not have permission to signal it.
return True
return True
def _read_lock_pid(lock_path: Path) -> int | None:
try:
text = lock_path.read_text(encoding="utf-8")
except OSError:
return None
for line in text.splitlines():
if line.startswith("pid:"):
_, val = line.split(":", 1)
val = val.strip()
try:
return int(val)
except ValueError:
return None
return None
@dataclass
class HostLock:
lock_path: Path
def release(self) -> None:
try:
self.lock_path.unlink(missing_ok=True)
except OSError:
# Best effort; leaving a stale lock is annoying but doctor can detect later.
pass
def __enter__(self) -> "HostLock":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.release()
def acquire_host_lock(locks_dir: Path, host: str, command: str) -> HostLock:
"""
Acquire an exclusive lock for a host. Lock path: <locks_dir>/<host>.lock
Behavior:
- If lock exists and PID alive: raise LockError
- If lock exists and PID not alive: remove lock and retry once
"""
host = sanitize_host(host)
locks_dir.mkdir(parents=True, exist_ok=True)
lock_path = locks_dir / f"{host}.lock"
def _try_create() -> HostLock | None:
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
try:
fd = os.open(str(lock_path), flags, 0o640)
except FileExistsError:
return None
try:
payload = (
f"pid: {os.getpid()}\n"
f"started_at: {_utc_now_iso()}\n"
f"command: {command}\n"
)
os.write(fd, payload.encode("utf-8"))
finally:
os.close(fd)
return HostLock(lock_path=lock_path)
lock = _try_create()
if lock is not None:
return lock
pid = _read_lock_pid(lock_path)
if pid is not None and _pid_is_alive(pid):
raise LockError(f"Host '{host}' is already locked by pid {pid}: {lock_path}")
# Stale lock: remove and retry once
try:
lock_path.unlink(missing_ok=True)
except OSError:
raise LockError(f"Host '{host}' lock exists but could not be removed: {lock_path}")
lock = _try_create()
if lock is not None:
return lock
raise LockError(f"Host '{host}' is already locked: {lock_path}")

93
src/pobsync/rsync.py Normal file
View File

@@ -0,0 +1,93 @@
from __future__ import annotations
import shlex
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
@dataclass(frozen=True)
class RsyncResult:
exit_code: int
command: list[str]
def build_ssh_command(ssh_cfg: dict) -> list[str]:
cmd = ["ssh"]
for opt in ssh_cfg.get("options", []) or []:
cmd.append(str(opt))
port = ssh_cfg.get("port")
if port:
cmd.extend(["-p", str(port)])
return cmd
def build_rsync_command(
*,
rsync_binary: str,
rsync_args: Sequence[str],
ssh_cmd: Sequence[str],
source: str,
dest: str,
link_dest: str | None,
dry_run: bool,
timeout_seconds: int,
bwlimit_kbps: int,
extra_excludes: Sequence[str],
extra_includes: Sequence[str],
) -> list[str]:
cmd: list[str] = [rsync_binary]
cmd.extend(list(rsync_args))
# includes/excludes: keep it simple for now:
# - if includes are provided, user is responsible for correct rsync include logic.
# - excludes are appended as --exclude=PATTERN
for inc in extra_includes:
cmd.append(f"--include={inc}")
for exc in extra_excludes:
cmd.append(f"--exclude={exc}")
if bwlimit_kbps and bwlimit_kbps > 0:
cmd.append(f"--bwlimit={bwlimit_kbps}")
if dry_run:
cmd.append("--dry-run")
if link_dest:
cmd.append(f"--link-dest={link_dest}")
# ssh transport
cmd.extend(["-e", " ".join(shlex.quote(x) for x in ssh_cmd)])
cmd.extend([source, dest])
return cmd
def run_rsync(command: list[str], log_path: Path, timeout_seconds: int) -> RsyncResult:
"""
Run rsync and always write stdout/stderr to log_path.
This function guarantees that log_path exists after it returns,
unless an unrecoverable filesystem error occurred before process launch.
"""
log_path.parent.mkdir(parents=True, exist_ok=True)
# Ensure the file exists early.
log_path.touch(exist_ok=True)
try:
with log_path.open("ab") as f:
p = subprocess.run(
command,
stdout=f,
stderr=subprocess.STDOUT,
timeout=timeout_seconds if timeout_seconds > 0 else None,
)
return RsyncResult(exit_code=p.returncode, command=command)
except subprocess.TimeoutExpired as e:
# Log timeout info and return a non-zero exit code.
with log_path.open("ab") as f:
f.write(b"\n[pobsync] rsync timed out\n")
return RsyncResult(exit_code=124, command=command)

78
src/pobsync/snapshot.py Normal file
View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import secrets
import string
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" # base32-ish without ambiguous chars
def generate_id(length: int = 8) -> str:
return "".join(secrets.choice(_ALPHABET) for _ in range(length))
def utc_now() -> datetime:
return datetime.now(timezone.utc).replace(microsecond=0)
def format_dir_timestamp(ts: datetime) -> str:
# YYYYmmdd-HHMMSSZ
return ts.strftime("%Y%m%d-%H%M%SZ")
def format_iso_z(ts: datetime) -> str:
return ts.isoformat().replace("+00:00", "Z")
def parse_dir_timestamp(s: str) -> datetime | None:
# Best-effort parse for sorting; returns None if unknown.
try:
if not s.endswith("Z"):
return None
return datetime.strptime(s, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
except ValueError:
return None
@dataclass(frozen=True)
class HostBackupDirs:
root: Path # <backup_root>/<host>
@property
def scheduled(self) -> Path:
return self.root / "scheduled"
@property
def manual(self) -> Path:
return self.root / "manual"
@property
def incomplete(self) -> Path:
return self.root / ".incomplete"
def snapshot_dir_name(ts: datetime, snap_id: str) -> str:
return f"{format_dir_timestamp(ts)}__{snap_id}"
def list_snapshot_dirs(parent: Path) -> list[Path]:
if not parent.exists():
return []
dirs = [p for p in parent.iterdir() if p.is_dir()]
return sorted(dirs)
def extract_ts_and_id_from_dirname(name: str) -> tuple[datetime | None, str | None]:
# expected: <ts>__<id> (label may exist for manual, but for base selection we only use scheduled naming)
if "__" not in name:
return None, None
ts_part, rest = name.split("__", 1)
ts = parse_dir_timestamp(ts_part)
snap_id = rest.split("--", 1)[0] # strip label if present
if not snap_id:
return ts, None
return ts, snap_id

View File

@@ -18,10 +18,14 @@ def is_tty() -> bool:
def sanitize_host(host: str) -> str:
if not HOST_RE.match(host):
raise ValueError(f"Invalid host name: {host!r}. Allowed: [A-Za-z0-9._-]+")
# Import locally to avoid import cycles at module import time
from .errors import ConfigError
raise ConfigError(f"Invalid host name: {host!r}. Allowed: [A-Za-z0-9._-]+")
return host
def ensure_dir(path: Path, mode: int = 0o750) -> None:
path.mkdir(parents=True, exist_ok=True)
try: