Commit first skeleton code for pobsync

This commit is contained in:
2026-02-02 22:15:54 +01:00
commit bc5fb21762
15 changed files with 1036 additions and 0 deletions

41
README.md Normal file
View File

@@ -0,0 +1,41 @@
# pobsync
`pobsync` is a pull-based backup tool for sysadmins.
It creates rsync-based snapshots with hardlinking (`--link-dest`) and stores them centrally on a backup server.
Backups are **pulled over SSH**, not pushed, and are designed to be run from cron or manually.
---
## Design overview
- Runtime, config, logs and state live under **`/opt/pobsync`**
- Backup data itself is stored under a configurable **`backup_root`** (e.g. `/srv/backups`)
- Two snapshot types:
- **scheduled**
Participates in retention pruning (daily / weekly / monthly / yearly)
- **manual**
Kept outside the scheduled prune chain, defaults to hardlinking from the latest scheduled snapshot
- Minimal dependencies (currently only `PyYAML`)
---
## Requirements
- Python **3.11+**
- `rsync`
- `ssh`
- Root or sudo access on the backup server
- SSH keys already configured between backup server and remotes
---
## Installation (system-wide, no venv)
This assumes you are installing as root or via sudo.
From the repository root:
```bash
python3 -m pip install --upgrade pip
sudo python3 -m pip install .

21
pyproject.toml Normal file
View File

@@ -0,0 +1,21 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "pobsync"
version = "0.1.0"
description = "Pull-based rsync backup tool with hardlinked snapshots"
requires-python = ">=3.11"
dependencies = [
"PyYAML>=6.0"
]
[project.scripts]
pobsync = "pobsync.cli:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]

3
src/pobsync/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
__all__ = ["__version__"]
__version__ = "0.1.0"

5
src/pobsync/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

189
src/pobsync/cli.py Normal file
View File

@@ -0,0 +1,189 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from .commands.doctor import run_doctor
from .commands.init_host import run_init_host
from .commands.install import run_install
from .errors import ConfigError, DoctorError, InstallError, PobsyncError
from .paths import PobsyncPaths
from .util import is_tty, to_json_safe
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="pobsync")
p.add_argument("--prefix", default="/opt/pobsync", help="Pobsync home directory (default: /opt/pobsync)")
p.add_argument("--json", action="store_true", help="Machine-readable JSON output")
sub = p.add_subparsers(dest="command", required=True)
# install
ip = sub.add_parser("install", help="Bootstrap /opt/pobsync layout and create global config")
ip.add_argument("--backup-root", help="Backup root directory (e.g. /srv/backups)")
ip.add_argument("--retention", default="daily=14,weekly=8,monthly=12,yearly=0", help="Default retention for init-host")
ip.add_argument("--force", action="store_true", help="Overwrite existing global config")
ip.add_argument("--dry-run", action="store_true", help="Show actions, do not write")
ip.set_defaults(_handler=cmd_install)
# init-host
hp = sub.add_parser("init-host", help="Create a host config YAML under config/hosts")
hp.add_argument("host", help="Host name (used as filename)")
hp.add_argument("--address", help="Hostname or IP of the remote")
hp.add_argument("--ssh-user", default=None)
hp.add_argument("--ssh-port", type=int, default=None)
hp.add_argument("--retention", default=None, help="Override retention for this host (daily=...,weekly=...)")
hp.add_argument("--exclude-add", action="append", default=[], help="Additional excludes (repeatable)")
hp.add_argument("--exclude-replace", action="append", default=None, help="Replace excludes list (repeatable)")
hp.add_argument("--include", action="append", default=[], help="Include patterns (repeatable)")
hp.add_argument("--force", action="store_true")
hp.add_argument("--dry-run", action="store_true")
hp.set_defaults(_handler=cmd_init_host)
# doctor
dp = sub.add_parser("doctor", help="Validate installation and configuration")
dp.add_argument("host", nargs="?", default=None, help="Optional host to validate")
dp.add_argument("--connect", action="store_true", help="Try SSH connectivity check (phase 2)")
dp.add_argument("--rsync-dry-run", action="store_true", help="Try rsync dry run (phase 2)")
dp.set_defaults(_handler=cmd_doctor)
return p
def parse_retention(s: str) -> dict[str, int]:
"""
Parse format: daily=14,weekly=8,monthly=12,yearly=0
"""
out: dict[str, int] = {}
parts = [p.strip() for p in s.split(",") if p.strip()]
for part in parts:
if "=" not in part:
raise ValueError(f"Invalid retention component: {part!r}")
k, v = part.split("=", 1)
k = k.strip()
v = v.strip()
if k not in {"daily", "weekly", "monthly", "yearly"}:
raise ValueError(f"Invalid retention key: {k!r}")
n = int(v)
if n < 0:
raise ValueError(f"Retention must be >= 0 for {k}")
out[k] = n
# Ensure all keys exist (default missing to 0)
for k in ("daily", "weekly", "monthly", "yearly"):
out.setdefault(k, 0)
return out
def _print(result: dict[str, Any], as_json: bool) -> None:
if as_json:
print(json.dumps(to_json_safe(result), indent=2, sort_keys=False))
return
# Minimal human output for phase 1
if result.get("ok") is True:
print("OK")
else:
print("FAILED")
if "actions" in result:
for a in result["actions"]:
print(f"- {a}")
if "results" in result:
for r in result["results"]:
ok = r.get("ok", False)
label = "OK" if ok else "FAIL"
name = r.get("check", "check")
msg = r.get("message") or r.get("error") or ""
extra = ""
if "path" in r:
extra = f" ({r['path']})"
elif "name" in r:
extra = f" ({r['name']})"
elif "host" in r:
extra = f" ({r['host']})"
print(f"- {label} {name}{extra} {msg}".rstrip())
def cmd_install(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
retention = parse_retention(args.retention)
backup_root = args.backup_root
if backup_root is None and is_tty():
backup_root = input("backup_root (absolute path, not '/'): ").strip() or None
result = run_install(
prefix=prefix,
backup_root=backup_root,
retention=retention,
dry_run=bool(args.dry_run),
force=bool(args.force),
)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_init_host(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
address = args.address
if address is None and is_tty():
address = input("address (hostname or ip): ").strip() or None
if not address:
raise ConfigError("--address is required (or interactive input)")
if args.retention is None:
# In phase 1 we require retention explicitly or via install default.
# We'll read global.yaml if present to fetch retention_defaults.
from .config.load import load_global_config
paths = PobsyncPaths(home=prefix)
global_cfg = load_global_config(paths.global_config_path)
retention = global_cfg.get("retention_defaults") or {"daily": 14, "weekly": 8, "monthly": 12, "yearly": 0}
else:
retention = parse_retention(args.retention)
excludes_replace = args.exclude_replace if args.exclude_replace is not None else None
result = run_init_host(
prefix=prefix,
host=args.host,
address=address,
retention=retention,
ssh_user=args.ssh_user,
ssh_port=args.ssh_port,
excludes_add=list(args.exclude_add),
excludes_replace=excludes_replace,
includes=list(args.include),
dry_run=bool(args.dry_run),
force=bool(args.force),
)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_doctor(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_doctor(prefix=prefix, host=args.host, connect=bool(args.connect), rsync_dry_run=bool(args.rsync_dry_run))
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
handler = getattr(args, "_handler")
return int(handler(args))
except PobsyncError as e:
if args.json:
_print({"ok": False, "error": str(e), "type": type(e).__name__}, as_json=True)
else:
print(f"ERROR: {e}")
return 1
except KeyboardInterrupt:
if args.json:
_print({"ok": False, "error": "interrupted"}, as_json=True)
else:
print("ERROR: interrupted")
return 130

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import os
import shutil
from pathlib import Path
from typing import Any
from ..config.load import load_global_config, load_host_config
from ..errors import DoctorError
from ..paths import PobsyncPaths
from ..util import is_absolute_non_root
def _check_binary(name: str) -> tuple[bool, str]:
p = shutil.which(name)
if not p:
return False, f"missing binary: {name}"
return True, f"ok: {name} -> {p}"
def _check_writable_dir(path: Path) -> tuple[bool, str]:
try:
path.mkdir(parents=True, exist_ok=True)
except OSError as e:
return False, f"cannot create dir {path}: {e}"
try:
test = path / ".pobsync_write_test"
test.write_text("test", encoding="utf-8")
test.unlink(missing_ok=True)
except OSError as e:
return False, f"not writable: {path}: {e}"
return True, f"ok: writable {path}"
def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: bool) -> dict[str, Any]:
# Phase 1 doctor does not perform network checks yet (connect/rsync_dry_run acknowledged).
paths = PobsyncPaths(home=prefix)
results: list[dict[str, Any]] = []
ok = True
# Check required layout
for d in (paths.config_dir, paths.hosts_dir, paths.state_dir, paths.locks_dir, paths.logs_dir):
exists = d.exists()
results.append({"check": "path_exists", "path": str(d), "ok": exists})
if not exists:
ok = False
# Load and validate global config
global_cfg: dict[str, Any] | None = None
if paths.global_config_path.exists():
try:
global_cfg = load_global_config(paths.global_config_path)
results.append({"check": "global_config", "path": str(paths.global_config_path), "ok": True})
except Exception as e:
ok = False
results.append({"check": "global_config", "path": str(paths.global_config_path), "ok": False, "error": str(e)})
else:
ok = False
results.append({"check": "global_config", "path": str(paths.global_config_path), "ok": False, "error": "missing"})
# Basic binaries
b1, m1 = _check_binary("rsync")
results.append({"check": "binary", "name": "rsync", "ok": b1, "message": m1})
ok = ok and b1
b2, m2 = _check_binary("ssh")
results.append({"check": "binary", "name": "ssh", "ok": b2, "message": m2})
ok = ok and b2
# backup_root checks
if global_cfg is not None:
backup_root = global_cfg.get("backup_root")
if isinstance(backup_root, str) and is_absolute_non_root(backup_root):
br = Path(backup_root)
w_ok, w_msg = _check_writable_dir(br)
results.append({"check": "backup_root", "path": str(br), "ok": w_ok, "message": w_msg})
ok = ok and w_ok
else:
ok = False
results.append({"check": "backup_root", "ok": False, "error": "invalid backup_root"})
else:
results.append({"check": "backup_root", "ok": False, "error": "global config not loaded"})
# host checks
if host is not None:
host_path = paths.hosts_dir / f"{host}.yaml"
if not host_path.exists():
ok = False
results.append({"check": "host_config", "host": host, "ok": False, "error": f"missing {host_path}"})
else:
try:
_ = load_host_config(host_path)
results.append({"check": "host_config", "host": host, "ok": True, "path": str(host_path)})
except Exception as e:
ok = False
results.append({"check": "host_config", "host": host, "ok": False, "path": str(host_path), "error": str(e)})
# Phase 1: report that connect/rsync_dry_run are not implemented yet
if connect:
results.append({"check": "connect", "ok": False, "error": "not implemented in phase 1"})
ok = False
if rsync_dry_run:
results.append({"check": "rsync_dry_run", "ok": False, "error": "not implemented in phase 1"})
ok = False
if not ok:
# Do not raise; return structured report. CLI will map to exit code 1.
return {"ok": False, "results": results}
return {"ok": True, "results": results}

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from ..errors import ConfigError
from ..paths import PobsyncPaths
from ..util import sanitize_host
def build_host_config(
host: str,
address: str,
retention: dict[str, int],
ssh_user: str | None = None,
ssh_port: int | None = None,
excludes_add: list[str] | None = None,
excludes_replace: list[str] | None = None,
includes: list[str] | None = None,
) -> dict[str, Any]:
cfg: dict[str, Any] = {
"host": host,
"address": address,
"retention": retention,
"includes": includes or [],
}
if ssh_user is not None or ssh_port is not None:
cfg["ssh"] = {}
if ssh_user is not None:
cfg["ssh"]["user"] = ssh_user
if ssh_port is not None:
cfg["ssh"]["port"] = ssh_port
if excludes_replace is not None:
cfg["excludes_replace"] = excludes_replace
else:
cfg["excludes_add"] = excludes_add or []
return cfg
def run_init_host(
prefix: Path,
host: str,
address: str,
retention: dict[str, int],
ssh_user: str | None,
ssh_port: int | None,
excludes_add: list[str],
excludes_replace: list[str] | None,
includes: list[str],
dry_run: bool,
force: bool,
) -> dict[str, Any]:
host = sanitize_host(host)
paths = PobsyncPaths(home=prefix)
target = paths.hosts_dir / f"{host}.yaml"
if target.exists() and not force:
raise ConfigError(f"Host config already exists: {target} (use --force to overwrite)")
cfg = build_host_config(
host=host,
address=address,
retention=retention,
ssh_user=ssh_user,
ssh_port=ssh_port,
excludes_add=excludes_add,
excludes_replace=excludes_replace,
includes=includes,
)
action: str
if dry_run:
action = f"would write {target}"
else:
target.write_text(yaml.safe_dump(cfg, sort_keys=False), encoding="utf-8")
action = f"wrote {target}"
return {"ok": True, "action": action, "host_config": str(target)}

View File

@@ -0,0 +1,130 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from ..errors import InstallError
from ..paths import PobsyncPaths
from ..util import ensure_dir, is_absolute_non_root
DEFAULT_EXCLUDES = [
"/proc/***",
"/sys/***",
"/dev/***",
"/run/***",
"/tmp/***",
"/mnt/***",
"/media/***",
"/lost+found/***",
"/var/cache/***",
"/var/tmp/***",
"/var/run/***",
"/var/lock/***",
"/swapfile",
"/.snapshots/***",
]
DEFAULT_RSYNC_ARGS = [
"--archive",
"--numeric-ids",
"--delete",
"--delete-excluded",
"--partial",
"--partial-dir=.rsync-partial",
"--inplace",
"--one-file-system",
"--relative",
"--human-readable",
"--stats",
]
def build_default_global_config(pobsync_home: Path, backup_root: str, retention: dict[str, int]) -> dict[str, Any]:
return {
"backup_root": backup_root,
"pobsync_home": str(pobsync_home),
"ssh": {
"user": "root",
"port": 22,
"options": [
"-oBatchMode=yes",
"-oStrictHostKeyChecking=accept-new",
],
},
"rsync": {
"binary": "rsync",
"args": DEFAULT_RSYNC_ARGS,
"timeout_seconds": 0,
"bwlimit_kbps": 0,
"extra_args": [],
},
"defaults": {
"source_root": "/",
"destination_subdir": "",
},
"excludes_default": DEFAULT_EXCLUDES,
"logging": {
"file": str(pobsync_home / "logs" / "pobsync.log"),
"level": "INFO",
},
"output": {
"default_format": "human",
},
# We store default retention here for init-host convenience; host config still requires retention.
"retention_defaults": retention,
}
def install_layout(paths: PobsyncPaths, dry_run: bool) -> list[str]:
actions: list[str] = []
for d in (paths.home, paths.config_dir, paths.hosts_dir, paths.state_dir, paths.locks_dir, paths.logs_dir):
actions.append(f"mkdir -p {d}")
if not dry_run:
ensure_dir(d)
return actions
def write_yaml(path: Path, data: dict[str, Any], dry_run: bool, force: bool) -> str:
if path.exists() and not force:
return f"skip existing {path}"
if path.exists() and force:
bak = path.with_suffix(path.suffix + ".bak")
if not dry_run:
bak.write_text(path.read_text(encoding="utf-8"), encoding="utf-8")
return f"overwrite {path} (backup {bak})"
if not dry_run:
path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8")
return f"write {path}"
def run_install(
prefix: Path,
backup_root: str | None,
retention: dict[str, int],
dry_run: bool,
force: bool,
) -> dict[str, Any]:
if backup_root is None:
raise InstallError("backup_root is required (use --backup-root or interactive mode)")
if not is_absolute_non_root(backup_root):
raise InstallError("backup_root must be an absolute path and must not be '/'")
paths = PobsyncPaths(home=prefix)
actions = install_layout(paths, dry_run=dry_run)
global_cfg = build_default_global_config(paths.home, backup_root=backup_root, retention=retention)
actions.append(write_yaml(paths.global_config_path, global_cfg, dry_run=dry_run, force=force))
return {
"ok": True,
"actions": actions,
"paths": {
"home": str(paths.home),
"global_config": str(paths.global_config_path),
},
}

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from ..errors import ConfigError, ValidationError
from ..validate import validate_dict
from .schemas import GLOBAL_SCHEMA, HOST_SCHEMA
def load_yaml_file(path: Path) -> dict[str, Any]:
if not path.exists():
raise ConfigError(f"Missing config file: {path}")
try:
raw = path.read_text(encoding="utf-8")
except OSError as e:
raise ConfigError(f"Cannot read config file: {path}: {e}") from e
try:
data = yaml.safe_load(raw)
except yaml.YAMLError as e:
raise ConfigError(f"Invalid YAML in {path}: {e}") from e
if data is None:
data = {}
if not isinstance(data, dict):
raise ConfigError(f"Config root must be a mapping in {path}")
return data
def load_global_config(path: Path) -> dict[str, Any]:
data = load_yaml_file(path)
try:
return validate_dict(data, GLOBAL_SCHEMA, path="global")
except ValidationError as e:
raise ConfigError(f"Invalid global config at {path}: {format_validation_error(e)}") from e
def load_host_config(path: Path) -> dict[str, Any]:
data = load_yaml_file(path)
try:
return validate_dict(data, HOST_SCHEMA, path="host")
except ValidationError as e:
raise ConfigError(f"Invalid host config at {path}: {format_validation_error(e)}") from e
def format_validation_error(err: ValidationError) -> str:
if err.path:
return f"{err.path}: {err}"
return str(err)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from typing import Any
def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""
Deep merge dictionaries:
- dict values are merged recursively
- non-dict values are replaced
Lists are replaced by caller explicitly (we avoid implicit list merge).
"""
out: dict[str, Any] = dict(base)
for k, v in override.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = deep_merge(out[k], v)
else:
out[k] = v
return out
def compute_effective_excludes(global_cfg: dict[str, Any], host_cfg: dict[str, Any]) -> list[str]:
default = list(global_cfg.get("excludes_default", []))
if "excludes_replace" in host_cfg and host_cfg["excludes_replace"] is not None:
return list(host_cfg["excludes_replace"])
add = list(host_cfg.get("excludes_add", []))
return default + add
def build_effective_config(global_cfg: dict[str, Any], host_cfg: dict[str, Any]) -> dict[str, Any]:
merged = deep_merge(global_cfg, host_cfg)
# Apply defaults for host source_root from global.defaults.source_root
if "source_root" not in host_cfg or host_cfg.get("source_root") is None:
merged.setdefault("defaults", {})
source_default = merged.get("defaults", {}).get("source_root", "/")
merged["source_root"] = source_default
merged["excludes_effective"] = compute_effective_excludes(global_cfg, host_cfg)
# Effective rsync args: global rsync.args + global rsync.extra_args + host rsync.extra_args
rsync_cfg = merged.get("rsync", {}) or {}
g_rsync = global_cfg.get("rsync", {}) or {}
h_rsync = host_cfg.get("rsync", {}) or {}
args = list(g_rsync.get("args", []))
args += list(g_rsync.get("extra_args", []))
args += list(h_rsync.get("extra_args", []))
rsync_cfg["args_effective"] = args
merged["rsync"] = rsync_cfg
return merged

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import re
from ..validate import FieldSpec, Schema
HOST_RE = re.compile(r"^[A-Za-z0-9._-]+$")
SSH_SCHEMA = Schema(
fields={
"user": FieldSpec(str, required=False),
"port": FieldSpec(int, required=False, min_value=1, max_value=65535),
"options": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
},
allow_unknown=False,
)
RSYNC_SCHEMA = Schema(
fields={
"binary": FieldSpec(str, required=False, default="rsync"),
"args": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
"timeout_seconds": FieldSpec(int, required=False, default=0, min_value=0),
"bwlimit_kbps": FieldSpec(int, required=False, default=0, min_value=0),
"extra_args": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
},
allow_unknown=False,
)
DEFAULTS_SCHEMA = Schema(
fields={
"source_root": FieldSpec(str, required=False, default="/"),
"destination_subdir": FieldSpec(str, required=False, default=""),
},
allow_unknown=False,
)
LOGGING_SCHEMA = Schema(
fields={
"file": FieldSpec(str, required=False),
"level": FieldSpec(
str,
required=False,
default="INFO",
enum={"DEBUG", "INFO", "WARNING", "ERROR"},
),
},
allow_unknown=False,
)
OUTPUT_SCHEMA = Schema(
fields={
"default_format": FieldSpec(str, required=False, default="human", enum={"human", "json"}),
},
allow_unknown=False,
)
GLOBAL_SCHEMA = Schema(
fields={
"backup_root": FieldSpec(str, required=True),
"pobsync_home": FieldSpec(str, required=False, default="/opt/pobsync"),
"ssh": FieldSpec(dict, required=False, schema=SSH_SCHEMA),
"rsync": FieldSpec(dict, required=False, schema=RSYNC_SCHEMA),
"defaults": FieldSpec(dict, required=False, schema=DEFAULTS_SCHEMA),
"excludes_default": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
"logging": FieldSpec(dict, required=False, schema=LOGGING_SCHEMA),
"output": FieldSpec(dict, required=False, schema=OUTPUT_SCHEMA),
},
allow_unknown=False,
)
RETENTION_SCHEMA = Schema(
fields={
"daily": FieldSpec(int, required=True, min_value=0),
"weekly": FieldSpec(int, required=True, min_value=0),
"monthly": FieldSpec(int, required=True, min_value=0),
"yearly": FieldSpec(int, required=True, min_value=0),
},
allow_unknown=False,
)
HOST_RSYNC_SCHEMA = Schema(
fields={
"extra_args": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
},
allow_unknown=False,
)
HOST_SCHEMA = Schema(
fields={
"host": FieldSpec(str, required=True, regex=HOST_RE),
"address": FieldSpec(str, required=True),
"ssh": FieldSpec(dict, required=False, schema=SSH_SCHEMA),
"source_root": FieldSpec(str, required=False),
"includes": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
"excludes_add": FieldSpec(list, required=False, default=[], item=FieldSpec(str)),
"excludes_replace": FieldSpec(list, required=False, item=FieldSpec(str)),
"retention": FieldSpec(dict, required=True, schema=RETENTION_SCHEMA),
"rsync": FieldSpec(dict, required=False, schema=HOST_RSYNC_SCHEMA),
},
allow_unknown=False,
)

23
src/pobsync/errors.py Normal file
View File

@@ -0,0 +1,23 @@
class PobsyncError(Exception):
"""Base class for predictable pobsync errors."""
class ConfigError(PobsyncError):
"""Raised when configuration is invalid or missing."""
class ValidationError(PobsyncError):
"""Raised when schema validation fails."""
def __init__(self, message: str, path: str | None = None):
super().__init__(message)
self.path = path
class InstallError(PobsyncError):
"""Raised when installation/bootstrap fails."""
class DoctorError(PobsyncError):
"""Raised when doctor detects fatal issues."""

38
src/pobsync/paths.py Normal file
View File

@@ -0,0 +1,38 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class PobsyncPaths:
home: Path # usually /opt/pobsync
@property
def config_dir(self) -> Path:
return self.home / "config"
@property
def hosts_dir(self) -> Path:
return self.config_dir / "hosts"
@property
def state_dir(self) -> Path:
return self.home / "state"
@property
def locks_dir(self) -> Path:
return self.state_dir / "locks"
@property
def logs_dir(self) -> Path:
return self.home / "logs"
@property
def global_config_path(self) -> Path:
return self.config_dir / "global.yaml"
@property
def central_log_path(self) -> Path:
return self.logs_dir / "pobsync.log"

60
src/pobsync/util.py Normal file
View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Any
HOST_RE = re.compile(r"^[A-Za-z0-9._-]+$")
def is_tty() -> bool:
try:
return os.isatty(0)
except OSError:
return False
def sanitize_host(host: str) -> str:
if not HOST_RE.match(host):
raise ValueError(f"Invalid host name: {host!r}. Allowed: [A-Za-z0-9._-]+")
return host
def ensure_dir(path: Path, mode: int = 0o750) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
os.chmod(path, mode)
except PermissionError:
# Non-fatal; may be controlled by admin/umask.
pass
def is_absolute_non_root(path: str) -> bool:
p = Path(path)
return p.is_absolute() and str(p) != "/"
def realpath_startswith(target: Path, allowed_root: Path) -> bool:
t = target.resolve()
r = allowed_root.resolve()
try:
t.relative_to(r)
return True
except ValueError:
return False
def to_json_safe(obj: Any) -> Any:
"""
Minimal helper to ensure dict/list/primitive for JSON output.
"""
if isinstance(obj, dict):
return {str(k): to_json_safe(v) for k, v in obj.items()}
if isinstance(obj, list):
return [to_json_safe(v) for v in obj]
if isinstance(obj, (str, int, float, bool)) or obj is None:
return obj
return str(obj)

123
src/pobsync/validate.py Normal file
View File

@@ -0,0 +1,123 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Callable, Mapping, Sequence
from .errors import ValidationError
@dataclass(frozen=True)
class FieldSpec:
typ: type | tuple[type, ...]
required: bool = False
default: Any = None
allow_none: bool = False
enum: set[Any] | None = None
regex: re.Pattern[str] | None = None
min_value: int | None = None
max_value: int | None = None
item: "FieldSpec | None" = None
schema: "Schema | None" = None
@dataclass(frozen=True)
class Schema:
fields: dict[str, FieldSpec]
allow_unknown: bool = False
custom_validators: tuple[Callable[[dict[str, Any], str], None], ...] = ()
def _path_join(base: str, key: str) -> str:
return f"{base}.{key}" if base else key
def validate_dict(data: Any, schema: Schema, path: str = "") -> dict[str, Any]:
if not isinstance(data, dict):
raise ValidationError(f"Expected mapping, got {type(data).__name__}", path=path)
out: dict[str, Any] = {}
if not schema.allow_unknown:
unknown = set(data.keys()) - set(schema.fields.keys())
if unknown:
keys = ", ".join(sorted(str(k) for k in unknown))
raise ValidationError(f"Unknown keys: {keys}", path=path)
for key, spec in schema.fields.items():
p = _path_join(path, key)
if key not in data:
if spec.required:
raise ValidationError("Missing required key", path=p)
if spec.default is not None or (spec.default is None and not spec.required):
# Default may be None; only apply if caller wants it.
if spec.default is not None:
out[key] = spec.default
continue
value = data[key]
if value is None:
if spec.allow_none:
out[key] = None
continue
raise ValidationError("Value cannot be null", path=p)
out[key] = validate_value(value, spec, p)
# Preserve unknown keys if allowed
if schema.allow_unknown:
for k, v in data.items():
if k not in out:
out[str(k)] = v
for fn in schema.custom_validators:
fn(out, path)
return out
def validate_value(value: Any, spec: FieldSpec, path: str) -> Any:
if spec.schema is not None:
return validate_dict(value, spec.schema, path=path)
if not isinstance(value, spec.typ):
raise ValidationError(
f"Expected {type_name(spec.typ)}, got {type(value).__name__}",
path=path,
)
if spec.enum is not None and value not in spec.enum:
allowed = ", ".join(sorted(repr(v) for v in spec.enum))
raise ValidationError(f"Invalid value. Allowed: {allowed}", path=path)
if isinstance(value, str) and spec.regex is not None:
if not spec.regex.match(value):
raise ValidationError("Value does not match required pattern", path=path)
if isinstance(value, int):
if spec.min_value is not None and value < spec.min_value:
raise ValidationError(f"Value must be >= {spec.min_value}", path=path)
if spec.max_value is not None and value > spec.max_value:
raise ValidationError(f"Value must be <= {spec.max_value}", path=path)
if isinstance(value, list):
if spec.item is None:
return value
validated: list[Any] = []
for idx, item in enumerate(value):
ipath = f"{path}[{idx}]"
if item is None and spec.item.allow_none:
validated.append(None)
else:
validated.append(validate_value(item, spec.item, ipath))
return validated
return value
def type_name(t: type | tuple[type, ...]) -> str:
if isinstance(t, tuple):
return " | ".join(x.__name__ for x in t)
return t.__name__