Compare commits

...

11 Commits

15 changed files with 1750 additions and 115 deletions

129
README.md
View File

@@ -1,41 +1,112 @@
# pobsync # pobsync
`pobsync` is a pull-based backup tool for sysadmins. `pobsync` is a pull-based backup tool that runs on a central backup server and pulls data from remote servers via rsync over SSH.
It creates rsync-based snapshots with hardlinking (`--link-dest`) and stores them centrally on a backup server.
Backups are **pulled over SSH**, not pushed, and are designed to be run from cron or manually. Key points:
--- - All backup data lives on the backup server.
- Snapshots are rsync-based and use hardlinking (--link-dest) for space efficiency.
## Design overview - Designed for scheduled runs (cron) and manual runs.
- Minimal external dependencies (currently only PyYAML).
- Runtime, config, logs and state live under **`/opt/pobsync`**
- Backup data itself is stored under a configurable **`backup_root`** (e.g. `/srv/backups`)
- Two snapshot types:
- **scheduled**
Participates in retention pruning (daily / weekly / monthly / yearly)
- **manual**
Kept outside the scheduled prune chain, defaults to hardlinking from the latest scheduled snapshot
- Minimal dependencies (currently only `PyYAML`)
---
## Requirements ## Requirements
- Python **3.11+** On the backup server:
- `rsync`
- `ssh`
- Root or sudo access on the backup server
- SSH keys already configured between backup server and remotes
--- - Python 3
- rsync
- ssh
- SSH key-based access from the backup server to remotes
## Installation (system-wide, no venv) ## Canonical installation (no venv, repo used only for deployment)
This assumes you are installing as root or via sudo. This project uses a simple and explicit deployment model:
From the repository root: - The git clone is only used as a deployment input (and later for updates).
- Runtime code is deployed into /opt/pobsync/lib.
- The canonical entrypoint is /opt/pobsync/bin/pobsync.
```bash ### Install
python3 -m pip install --upgrade pip
sudo python3 -m pip install . ```git clone https://code.hosting.hippogrief.nl/hippogrief/pobsync.git
cd pobsync
sudo ./scripts/deploy --prefix /opt/pobsync
pobsync install --backup-root /mnt/backups/pobsync (install default configurations)
pobsync doctor (check if the installation was done correctly)
```
### Update
```
cd /path/to/pobsync
git pull
sudo ./scripts/deploy --prefix /opt/pobsync
sudo /opt/pobsync/bin/pobsync doctor
```
## Configuration
Global configuration is stored at:
- /opt/pobsync/config/global.yaml
Per-host configuration files are stored at:
- /opt/pobsync/config/hosts/<host>.yaml
## Some useful commands to get you started
Create a new host configuration:
`pobsync init-host <host>`
List configured remotes:
`pobsync list-remotes`
Inspect the effective configuration for a host:
`pobsync show-config <host>`
## Running backups
Run a scheduled backup for a host:
`pobsync run-scheduled <host>`
Optionally apply retention pruning after the run:
`pobsync run-scheduled <host> --prune`
## Scheduling (cron)
Create a cron schedule (writes into /etc/cron.d/pobsync by default):
`pobsync schedule create <host> --daily 02:15 --prune`
List existing schedules:
`pobsync schedule list`
Remove a schedule:
`pobsync schedule remove <host>`
Cron output is redirected to:
- /var/log/pobsync/<host>.cron.log
## Development (optional)
For development purposes you can still use an editable install, this is why pyproject.toml still exists:
```
python3 -m pip install -e .
pobsync --help
```
For production use, always use the canonical entrypoint:
/opt/pobsync/bin/pobsync

97
scripts/deploy Executable file
View File

@@ -0,0 +1,97 @@
#!/bin/sh
# Deploy pobsync runtime into /opt/pobsync without pip/venv.
# Copies python package sources into /opt/pobsync/lib and installs a stable entrypoint in /opt/pobsync/bin.
set -eu
PREFIX="/opt/pobsync"
usage() {
echo "Usage: $0 [--prefix /opt/pobsync]" >&2
exit 2
}
while [ $# -gt 0 ]; do
case "$1" in
--prefix)
[ $# -ge 2 ] || usage
PREFIX="$2"
shift 2
;;
-h|--help)
usage
;;
*)
echo "Unknown arg: $1" >&2
usage
;;
esac
done
# Determine repo root from this script location
SCRIPT_DIR="$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)"
REPO_ROOT="$(CDPATH= cd -- "${SCRIPT_DIR}/.." && pwd)"
SRC_PKG="${REPO_ROOT}/src/pobsync"
if [ ! -d "${SRC_PKG}" ]; then
echo "ERROR: expected python package at ${SRC_PKG}" >&2
exit 1
fi
BIN_DIR="${PREFIX}/bin"
LIB_DIR="${PREFIX}/lib"
DST_PKG="${LIB_DIR}/pobsync"
BUILD_FILE="${DST_PKG}/_build.txt"
mkdir -p "${BIN_DIR}" "${LIB_DIR}"
# Copy code into /opt/pobsync/lib/pobsync
# We use rsync if available (clean updates with --delete), otherwise fall back to cp -a.
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete \
--exclude '__pycache__/' \
--exclude '*.pyc' \
--exclude '*.pyo' \
--exclude '*.pyd' \
"${SRC_PKG}/" "${DST_PKG}/"
else
# Fallback: wipe + copy
rm -rf "${DST_PKG}"
mkdir -p "${DST_PKG}"
cp -a "${SRC_PKG}/." "${DST_PKG}/"
fi
# Write build info (best-effort)
GIT_SHA="unknown"
if command -v git >/dev/null 2>&1 && [ -d "${REPO_ROOT}/.git" ]; then
GIT_SHA="$(cd "${REPO_ROOT}" && git rev-parse HEAD 2>/dev/null || echo unknown)"
fi
NOW_UTC="$(date -u +"%Y-%m-%dT%H:%M:%SZ" 2>/dev/null || echo unknown)"
{
echo "deployed_at_utc=${NOW_UTC}"
echo "git_sha=${GIT_SHA}"
echo "repo_root=${REPO_ROOT}"
} > "${BUILD_FILE}"
# Install stable entrypoint that always runs code from /opt/pobsync/lib
WRAPPER="${BIN_DIR}/pobsync"
cat > "${WRAPPER}" <<EOF
#!/bin/sh
# managed-by=pobsync deploy
set -eu
PREFIX="${PREFIX}"
export PYTHONPATH="\${PREFIX}/lib"
export PYTHONUNBUFFERED=1
exec /usr/bin/python3 -m pobsync "\$@"
EOF
chmod 0755 "${WRAPPER}"
echo "OK"
echo "- deployed package to ${DST_PKG}"
echo "- wrote build info ${BUILD_FILE}"
echo "- installed entrypoint ${WRAPPER}"

View File

@@ -9,13 +9,18 @@ from .commands.doctor import run_doctor
from .commands.init_host import run_init_host from .commands.init_host import run_init_host
from .commands.install import run_install from .commands.install import run_install
from .commands.list_remotes import run_list_remotes from .commands.list_remotes import run_list_remotes
from .commands.retention_apply import run_retention_apply
from .commands.retention_plan import run_retention_plan
from .commands.run_scheduled import run_scheduled from .commands.run_scheduled import run_scheduled
from .commands.schedule_create import run_schedule_create
from .commands.schedule_list import run_schedule_list
from .commands.schedule_remove import run_schedule_remove
from .commands.show_config import dump_yaml, run_show_config from .commands.show_config import dump_yaml, run_show_config
from .commands.snapshots_list import run_snapshots_list from .commands.snapshots_list import run_snapshots_list
from .commands.snapshots_show import run_snapshots_show from .commands.snapshots_show import run_snapshots_show
from .errors import ConfigError, DoctorError, InstallError, LockError, PobsyncError from .errors import LockError, PobsyncError
from .paths import PobsyncPaths from .schedule import CRON_FILE_DEFAULT
from .util import is_tty, to_json_safe from .util import to_json_safe
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
@@ -67,6 +72,9 @@ def build_parser() -> argparse.ArgumentParser:
rp = sub.add_parser("run-scheduled", help="Run a scheduled backup for a host") rp = sub.add_parser("run-scheduled", help="Run a scheduled backup for a host")
rp.add_argument("host", help="Host to back up") rp.add_argument("host", help="Host to back up")
rp.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run without creating directories") rp.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run without creating directories")
rp.add_argument("--prune", action="store_true", help="Apply retention after a successful run (default: false)")
rp.add_argument("--prune-max-delete", type=int, default=10, help="Refuse to prune more than N snapshots (default: 10)")
rp.add_argument("--prune-protect-bases", action="store_true", help="When pruning, also keep base snapshots referenced in meta")
rp.set_defaults(_handler=cmd_run_scheduled) rp.set_defaults(_handler=cmd_run_scheduled)
# snapshots # snapshots
@@ -77,16 +85,72 @@ def build_parser() -> argparse.ArgumentParser:
sn_list.add_argument("host", help="Host name") sn_list.add_argument("host", help="Host name")
sn_list.add_argument("--kind", default="all", help="scheduled|manual|incomplete|all (default: all)") sn_list.add_argument("--kind", default="all", help="scheduled|manual|incomplete|all (default: all)")
sn_list.add_argument("--limit", type=int, default=20, help="Max results (default: 20)") sn_list.add_argument("--limit", type=int, default=20, help="Max results (default: 20)")
sn_list.add_argument("--include-incomplete", action="store_true", help="Include .incomplete when --kind=all (default: false)") sn_list.add_argument("--include-incomplete", action="store_true", help="Include .incomplete when --kind=all")
sn_list.set_defaults(_handler=cmd_snapshots_list) sn_list.set_defaults(_handler=cmd_snapshots_list)
sn_show = sn_sub.add_parser("show", help="Show snapshot metadata") sn_show = sn_sub.add_parser("show", help="Show snapshot metadata")
sn_show.add_argument("host", help="Host name") sn_show.add_argument("host", help="Host name")
sn_show.add_argument("--kind", required=True, help="scheduled|manual|incomplete") sn_show.add_argument("--kind", required=True, help="scheduled|manual|incomplete")
sn_show.add_argument("dirname", help="Snapshot directory name (e.g. 20260202-223807Z__K3VQEVH7)") sn_show.add_argument("dirname", help="Snapshot directory name")
sn_show.add_argument("--tail", type=int, default=None, help="Show last N lines of rsync.log") sn_show.add_argument("--tail", type=int, default=None, help="Show last N lines of rsync.log")
sn_show.set_defaults(_handler=cmd_snapshots_show) sn_show.set_defaults(_handler=cmd_snapshots_show)
# retention
rt = sub.add_parser("retention", help="Retention management")
rt_sub = rt.add_subparsers(dest="retention_cmd", required=True)
rt_plan = rt_sub.add_parser("plan", help="Show retention prune plan (dry-run)")
rt_plan.add_argument("host", help="Host name")
rt_plan.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_plan.add_argument("--protect-bases", action="store_true", help="Also keep base snapshots referenced in meta (default: false)")
rt_plan.set_defaults(_handler=cmd_retention_plan)
rt_apply = rt_sub.add_parser("apply", help="Apply retention plan (DESTRUCTIVE)")
rt_apply.add_argument("host", help="Host name")
rt_apply.add_argument("--kind", default="scheduled", help="scheduled|manual|all (default: scheduled)")
rt_apply.add_argument("--protect-bases", action="store_true", help="Also keep base snapshots referenced in meta (default: false)")
rt_apply.add_argument("--max-delete", type=int, default=10, help="Refuse to delete more than N snapshots (default: 10)")
rt_apply.add_argument("--yes", action="store_true", help="Confirm deletion")
rt_apply.set_defaults(_handler=cmd_retention_apply)
# schedule
sch = sub.add_parser("schedule", help="Manage cron schedules in /etc/cron.d/pobsync")
sch_sub = sch.add_subparsers(dest="schedule_cmd", required=True)
sch_create = sch_sub.add_parser("create", help="Create or update a schedule for a host")
sch_create.add_argument("host", help="Host name")
mode = sch_create.add_mutually_exclusive_group(required=True)
mode.add_argument("--cron", default=None, help='Raw cron expression (5 fields), e.g. "15 2 * * *"')
mode.add_argument("--daily", default=None, help="Daily at HH:MM")
mode.add_argument("--hourly", type=int, default=None, help="Hourly at minute N (0..59)")
mode.add_argument("--weekly", action="store_true", help="Weekly schedule (requires --dow and --time)")
mode.add_argument("--monthly", action="store_true", help="Monthly schedule (requires --day and --time)")
sch_create.add_argument("--dow", default=None, help="For --weekly: mon,tue,wed,thu,fri,sat,sun")
sch_create.add_argument("--day", type=int, default=None, help="For --monthly: day of month (1..31)")
sch_create.add_argument("--time", default=None, help="For --weekly/--monthly: HH:MM")
sch_create.add_argument("--user", default="root", help="Cron user field (default: root)")
sch_create.add_argument("--cron-file", default=CRON_FILE_DEFAULT, help="Cron file path (default: /etc/cron.d/pobsync)")
sch_create.add_argument("--prune", action="store_true", help="Run retention prune after successful backup")
sch_create.add_argument("--prune-max-delete", type=int, default=10, help="Prune guardrail (default: 10)")
sch_create.add_argument("--prune-protect-bases", action="store_true", help="Prune with base protection (default: false)")
sch_create.add_argument("--dry-run", action="store_true", help="Show actions, do not write")
sch_create.set_defaults(_handler=cmd_schedule_create)
sch_list = sch_sub.add_parser("list", help="List schedules from /etc/cron.d/pobsync")
sch_list.add_argument("--host", default=None, help="Filter by host")
sch_list.add_argument("--cron-file", default=CRON_FILE_DEFAULT, help="Cron file path (default: /etc/cron.d/pobsync)")
sch_list.set_defaults(_handler=cmd_schedule_list)
sch_remove = sch_sub.add_parser("remove", help="Remove schedule block for a host")
sch_remove.add_argument("host", help="Host name")
sch_remove.add_argument("--cron-file", default=CRON_FILE_DEFAULT, help="Cron file path (default: /etc/cron.d/pobsync)")
sch_remove.add_argument("--dry-run", action="store_true", help="Show actions, do not write")
sch_remove.set_defaults(_handler=cmd_schedule_remove)
return p return p
@@ -124,24 +188,16 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
for a in result["actions"]: for a in result["actions"]:
print(f"- {a}") print(f"- {a}")
if "action" in result:
print(f"- {result['action']}")
if "results" in result: if "results" in result:
for r in result["results"]: for r in result["results"]:
ok = r.get("ok", False) label = "OK" if r.get("ok") else "FAIL"
label = "OK" if ok else "FAIL"
name = r.get("check", "check") name = r.get("check", "check")
msg = r.get("message") or r.get("error") or "" msg = r.get("message") or r.get("error") or ""
extra = "" extra = ""
if "path" in r: if "path" in r:
extra = f" ({r['path']})" extra = f" ({r['path']})"
elif "name" in r:
extra = f" ({r['name']})"
elif "host" in r: elif "host" in r:
extra = f" ({r['host']})" extra = f" ({r['host']})"
line = f"- {label} {name}{extra}" line = f"- {label} {name}{extra}"
if msg: if msg:
line += f" {msg}" line += f" {msg}"
@@ -170,18 +226,65 @@ def _print(result: dict[str, Any], as_json: bool) -> None:
extra = " " + extra extra = " " + extra
print(f"- {kind} {dirname} {status}{extra}") print(f"- {kind} {dirname} {status}{extra}")
if "keep" in result and "delete" in result:
keep = result.get("keep") or []
delete = result.get("delete") or []
reasons = result.get("reasons") or {}
total = len(keep) + len(delete)
print(f"- total {total}")
print(f"- keep {len(keep)}")
print(f"- delete {len(delete)}")
if result.get("protect_bases") is True:
print("- protect_bases true")
if keep:
print("- keep:")
for d in keep:
rs = reasons.get(d) or []
rs_s = f" ({', '.join(rs)})" if rs else ""
print(f" - {d}{rs_s}")
if delete:
print("- delete:")
for item in delete:
dirname = item.get("dirname", "?")
dt = item.get("dt") or ""
status = item.get("status") or "unknown"
kind = item.get("kind", "?")
extra = " ".join(x for x in [kind, status, dt] if x)
if extra:
extra = " " + extra
print(f" - {dirname}{extra}")
if "schedules" in result:
for s in result["schedules"]:
host = s.get("host", "?")
cron = s.get("cron") or "unknown"
user = s.get("user") or "unknown"
prune = bool(s.get("prune", False))
prune_max = s.get("prune_max_delete", None)
protect = bool(s.get("prune_protect_bases", False))
extra = ""
if prune:
extra = " prune"
if isinstance(prune_max, int):
extra += f" max_delete={prune_max}"
if protect:
extra += " protect_bases"
print(f"- {host} {cron} {user}{extra}")
def cmd_install(args: argparse.Namespace) -> int: def cmd_install(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
retention = parse_retention(args.retention) retention = parse_retention(args.retention)
backup_root = args.backup_root
if backup_root is None and is_tty():
backup_root = input("backup_root (absolute path, not '/'): ").strip() or None
result = run_install( result = run_install(
prefix=prefix, prefix=prefix,
backup_root=backup_root, backup_root=args.backup_root,
retention=retention, retention=retention,
dry_run=bool(args.dry_run), dry_run=bool(args.dry_run),
force=bool(args.force), force=bool(args.force),
@@ -192,38 +295,15 @@ def cmd_install(args: argparse.Namespace) -> int:
def cmd_init_host(args: argparse.Namespace) -> int: def cmd_init_host(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
address = args.address
if address is None and is_tty():
address = input("address (hostname or ip): ").strip() or None
if not address:
raise ConfigError("--address is required (or interactive input)")
if args.retention is None:
from .config.load import load_global_config
paths = PobsyncPaths(home=prefix)
global_cfg = load_global_config(paths.global_config_path)
retention = global_cfg.get("retention_defaults") or {
"daily": 14,
"weekly": 8,
"monthly": 12,
"yearly": 0,
}
else:
retention = parse_retention(args.retention)
excludes_replace = args.exclude_replace if args.exclude_replace is not None else None
result = run_init_host( result = run_init_host(
prefix=prefix, prefix=prefix,
host=args.host, host=args.host,
address=address, address=args.address,
retention=retention, retention=args.retention,
ssh_user=args.ssh_user, ssh_user=args.ssh_user,
ssh_port=args.ssh_port, ssh_port=args.ssh_port,
excludes_add=list(args.exclude_add), excludes_add=list(args.exclude_add),
excludes_replace=excludes_replace, excludes_replace=args.exclude_replace,
includes=list(args.include), includes=list(args.include),
dry_run=bool(args.dry_run), dry_run=bool(args.dry_run),
force=bool(args.force), force=bool(args.force),
@@ -232,26 +312,9 @@ def cmd_init_host(args: argparse.Namespace) -> int:
return 0 if result.get("ok") else 1 return 0 if result.get("ok") else 1
def cmd_show_config(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_show_config(prefix=prefix, host=args.host, effective=bool(args.effective))
if args.json:
_print(result, as_json=True)
else:
print(dump_yaml(result["config"]).rstrip())
return 0 if result.get("ok") else 1
def cmd_doctor(args: argparse.Namespace) -> int: def cmd_doctor(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
result = run_doctor( result = run_doctor(prefix=prefix, host=args.host, connect=bool(args.connect), rsync_dry_run=bool(args.rsync_dry_run))
prefix=prefix,
host=args.host,
connect=bool(args.connect),
rsync_dry_run=bool(args.rsync_dry_run),
)
_print(result, as_json=bool(args.json)) _print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1 return 0 if result.get("ok") else 1
@@ -263,9 +326,26 @@ def cmd_list_remotes(args: argparse.Namespace) -> int:
return 0 if result.get("ok") else 1 return 0 if result.get("ok") else 1
def cmd_show_config(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_show_config(prefix=prefix, host=args.host, effective=bool(args.effective))
if args.json:
_print(result, as_json=True)
else:
print(dump_yaml(result["config"]).rstrip())
return 0 if result.get("ok") else 1
def cmd_run_scheduled(args: argparse.Namespace) -> int: def cmd_run_scheduled(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
result = run_scheduled(prefix=prefix, host=args.host, dry_run=bool(args.dry_run)) result = run_scheduled(
prefix=prefix,
host=args.host,
dry_run=bool(args.dry_run),
prune=bool(args.prune),
prune_max_delete=int(args.prune_max_delete),
prune_protect_bases=bool(args.prune_protect_bases),
)
_print(result, as_json=bool(args.json)) _print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 2 return 0 if result.get("ok") else 2
@@ -285,26 +365,74 @@ def cmd_snapshots_list(args: argparse.Namespace) -> int:
def cmd_snapshots_show(args: argparse.Namespace) -> int: def cmd_snapshots_show(args: argparse.Namespace) -> int:
prefix = Path(args.prefix) prefix = Path(args.prefix)
result = run_snapshots_show( result = run_snapshots_show(prefix=prefix, host=args.host, kind=args.kind, dirname=args.dirname, tail=args.tail)
prefix=prefix,
host=args.host,
kind=args.kind,
dirname=args.dirname,
tail=args.tail,
)
if args.json: if args.json:
_print(result, as_json=True) _print(result, as_json=True)
else: else:
print(dump_yaml(result.get("meta", {})).rstrip()) print(dump_yaml(result.get("meta", {})).rstrip())
if result.get("log_path"): if result.get("log_path"):
print(f"\n# rsync.log: {result['log_path']}") print(f"\n# rsync.log: {result['log_path']}")
if result.get("log_tail"): if result.get("log_tail"):
print("\n# rsync.log (tail)") print("\n# rsync.log (tail)")
for line in result["log_tail"]: for line in result["log_tail"]:
print(line) print(line)
return 0 if result.get("ok") else 1
def cmd_retention_plan(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_plan(prefix=prefix, host=args.host, kind=args.kind, protect_bases=bool(args.protect_bases))
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_retention_apply(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_retention_apply(
prefix=prefix,
host=args.host,
kind=args.kind,
protect_bases=bool(args.protect_bases),
yes=bool(args.yes),
max_delete=int(args.max_delete),
)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_schedule_create(args: argparse.Namespace) -> int:
prefix = Path(args.prefix)
result = run_schedule_create(
host=args.host,
prefix=prefix,
cron_file=Path(args.cron_file),
cron_expr=args.cron,
daily=args.daily,
hourly=args.hourly,
weekly=bool(args.weekly),
dow=args.dow,
time=args.time,
monthly=bool(args.monthly),
day=args.day,
user=args.user,
prune=bool(args.prune),
prune_max_delete=int(args.prune_max_delete),
prune_protect_bases=bool(args.prune_protect_bases),
dry_run=bool(args.dry_run),
)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_schedule_list(args: argparse.Namespace) -> int:
result = run_schedule_list(cron_file=Path(args.cron_file), host=args.host)
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1
def cmd_schedule_remove(args: argparse.Namespace) -> int:
result = run_schedule_remove(host=args.host, cron_file=Path(args.cron_file), dry_run=bool(args.dry_run))
_print(result, as_json=bool(args.json))
return 0 if result.get("ok") else 1 return 0 if result.get("ok") else 1

View File

@@ -2,14 +2,17 @@ from __future__ import annotations
import os import os
import shutil import shutil
import subprocess
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from ..config.load import load_global_config, load_host_config from ..config.load import load_global_config, load_host_config
from ..errors import DoctorError
from ..paths import PobsyncPaths from ..paths import PobsyncPaths
from ..util import is_absolute_non_root from ..util import is_absolute_non_root
CRON_FILE_DEFAULT = Path("/etc/cron.d/pobsync")
LOG_DIR_DEFAULT = Path("/var/log/pobsync")
def _check_binary(name: str) -> tuple[bool, str]: def _check_binary(name: str) -> tuple[bool, str]:
p = shutil.which(name) p = shutil.which(name)
@@ -32,6 +35,77 @@ def _check_writable_dir(path: Path) -> tuple[bool, str]:
return True, f"ok: writable {path}" return True, f"ok: writable {path}"
def _run(cmd: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _check_cron_service() -> tuple[bool, str]:
"""
Best-effort check: verify cron service is active on systemd hosts.
If systemctl is missing, we don't fail doctor phase 1.
"""
systemctl = shutil.which("systemctl")
if not systemctl:
return True, "ok: systemctl not found; cannot verify cron service status"
for svc in ("cron", "crond"):
cp = _run([systemctl, "is-active", svc])
if cp.returncode == 0 and cp.stdout.strip() == "active":
return True, f"ok: cron service active ({svc})"
return False, "cron service not active (tried: cron, crond)"
def _check_cron_file_permissions(path: Path) -> tuple[bool, str]:
"""
/etc/cron.d files must not be writable by group/other.
Owner should be root.
Mode can be 0600 or 0644 (both ok as long as not group/other-writable).
"""
if not path.exists():
return True, f"ok: cron file not present ({path}); schedule may not be configured yet"
try:
st = path.stat()
except OSError as e:
return False, f"cannot stat cron file {path}: {e}"
if not path.is_file():
return False, f"cron file is not a regular file: {path}"
problems: list[str] = []
# root owner
if st.st_uid != 0:
problems.append("owner is not root")
# must not be group/other writable
if (st.st_mode & 0o022) != 0:
problems.append("writable by group/other")
if problems:
mode_octal = oct(st.st_mode & 0o777)
return False, f"unsafe cron file permissions/ownership for {path} (mode={mode_octal}): {', '.join(problems)}"
mode_octal = oct(st.st_mode & 0o777)
return True, f"ok: cron file permissions/ownership OK ({path}, mode={mode_octal})"
def _check_pobsync_executable(prefix: Path) -> tuple[bool, str]:
exe = prefix / "bin" / "pobsync"
if not exe.exists():
return False, f"missing executable: {exe}"
if not os.access(str(exe), os.X_OK):
return False, f"not executable: {exe}"
return True, f"ok: executable {exe}"
def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: bool) -> dict[str, Any]: def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: bool) -> dict[str, Any]:
# Phase 1 doctor does not perform network checks yet (connect/rsync_dry_run acknowledged). # Phase 1 doctor does not perform network checks yet (connect/rsync_dry_run acknowledged).
paths = PobsyncPaths(home=prefix) paths = PobsyncPaths(home=prefix)
@@ -63,6 +137,7 @@ def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: boo
b1, m1 = _check_binary("rsync") b1, m1 = _check_binary("rsync")
results.append({"check": "binary", "name": "rsync", "ok": b1, "message": m1}) results.append({"check": "binary", "name": "rsync", "ok": b1, "message": m1})
ok = ok and b1 ok = ok and b1
b2, m2 = _check_binary("ssh") b2, m2 = _check_binary("ssh")
results.append({"check": "binary", "name": "ssh", "ok": b2, "message": m2}) results.append({"check": "binary", "name": "ssh", "ok": b2, "message": m2})
ok = ok and b2 ok = ok and b2
@@ -81,6 +156,36 @@ def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: boo
else: else:
results.append({"check": "backup_root", "ok": False, "error": "global config not loaded"}) results.append({"check": "backup_root", "ok": False, "error": "global config not loaded"})
# ---- Scheduling checks (Step 1) ----
c_ok, c_msg = _check_cron_service()
results.append({"check": "schedule_cron_service", "ok": c_ok, "message": c_msg})
ok = ok and c_ok
f_ok, f_msg = _check_cron_file_permissions(CRON_FILE_DEFAULT)
results.append({"check": "schedule_cron_file", "path": str(CRON_FILE_DEFAULT), "ok": f_ok, "message": f_msg})
ok = ok and f_ok
# We treat missing log dir as a warning rather than hard-fail in phase 1:
# cron redirection may fail, but backups can still run.
if LOG_DIR_DEFAULT.exists():
l_ok, l_msg = _check_writable_dir(LOG_DIR_DEFAULT)
results.append({"check": "schedule_log_dir", "path": str(LOG_DIR_DEFAULT), "ok": l_ok, "message": l_msg})
ok = ok and l_ok
else:
results.append(
{
"check": "schedule_log_dir",
"path": str(LOG_DIR_DEFAULT),
"ok": True,
"message": f"ok: log dir does not exist ({LOG_DIR_DEFAULT}); cron redirection may fail (backlog: create in install)",
}
)
e_ok, e_msg = _check_pobsync_executable(prefix)
results.append({"check": "schedule_pobsync_executable", "path": str(prefix / "bin" / "pobsync"), "ok": e_ok, "message": e_msg})
ok = ok and e_ok
# host checks # host checks
if host is not None: if host is not None:
host_path = paths.hosts_dir / f"{host}.yaml" host_path = paths.hosts_dir / f"{host}.yaml"

View File

@@ -99,6 +99,25 @@ def write_yaml(path: Path, data: dict[str, Any], dry_run: bool, force: bool) ->
return f"write {path}" return f"write {path}"
def _ensure_system_log_dir(dry_run: bool) -> list[str]:
"""
Best-effort: create /var/log/pobsync to match cron redirection.
Not fatal if it fails (e.g., insufficient permissions in a non-root install attempt).
Note: the canonical entrypoint (/opt/pobsync/bin/pobsync) is owned by scripts/deploy.
install only prepares the runtime layout and config.
"""
actions: list[str] = []
log_dir = Path("/var/log/pobsync")
actions.append(f"mkdir -p {log_dir}")
if not dry_run:
try:
ensure_dir(log_dir)
except OSError as e:
actions.append(f"warn: cannot create {log_dir}: {e}")
return actions
def run_install( def run_install(
prefix: Path, prefix: Path,
backup_root: str | None, backup_root: str | None,
@@ -118,6 +137,10 @@ def run_install(
global_cfg = build_default_global_config(paths.home, backup_root=backup_root, retention=retention) global_cfg = build_default_global_config(paths.home, backup_root=backup_root, retention=retention)
actions.append(write_yaml(paths.global_config_path, global_cfg, dry_run=dry_run, force=force)) actions.append(write_yaml(paths.global_config_path, global_cfg, dry_run=dry_run, force=force))
# Install polish: ensure cron log directory exists.
# Code + entrypoint deployment is handled by scripts/deploy.
actions.extend(_ensure_system_log_dir(dry_run=dry_run))
return { return {
"ok": True, "ok": True,
"actions": actions, "actions": actions,

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import Any, Dict, List
from ..errors import ConfigError
from ..lock import acquire_host_lock
from ..paths import PobsyncPaths
from ..util import sanitize_host
from .retention_plan import run_retention_plan
def run_retention_apply(
prefix: Path,
host: str,
kind: str,
protect_bases: bool,
yes: bool,
max_delete: int,
acquire_lock: bool = True,
) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
if not yes:
raise ConfigError("Refusing to delete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
def _do_apply() -> dict[str, Any]:
plan = run_retention_plan(prefix=prefix, host=host, kind=kind, protect_bases=bool(protect_bases))
delete_list = plan.get("delete") or []
if not isinstance(delete_list, list):
raise ConfigError("Invalid retention plan output: delete is not a list")
if max_delete == 0 and len(delete_list) > 0:
raise ConfigError("Deletion blocked by --max-delete=0")
if len(delete_list) > max_delete:
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
actions: List[str] = []
deleted: List[Dict[str, Any]] = []
for item in delete_list:
if not isinstance(item, dict):
continue
dirname = item.get("dirname")
snap_kind = item.get("kind")
snap_path = item.get("path")
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
continue
# Hard safety: only allow scheduled/manual deletions from plan
if snap_kind not in {"scheduled", "manual"}:
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
p = Path(snap_path)
if not p.exists():
actions.append(f"skip missing {snap_kind}/{dirname}")
continue
if not p.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
shutil.rmtree(p)
actions.append(f"deleted {snap_kind} {dirname}")
deleted.append(
{
"dirname": dirname,
"kind": snap_kind,
"path": snap_path,
}
)
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"max_delete": max_delete,
"deleted": deleted,
"actions": actions,
}
if acquire_lock:
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
return _do_apply()
# Caller guarantees locking (used by run-scheduled)
return _do_apply()

View File

@@ -0,0 +1,164 @@
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from ..config.load import load_global_config, load_host_config
from ..config.merge import build_effective_config
from ..errors import ConfigError
from ..paths import PobsyncPaths
from ..retention import Snapshot, build_retention_plan
from ..snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root
from ..util import sanitize_host
def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime:
ts = meta.get("started_at")
if isinstance(ts, str) and ts.endswith("Z"):
try:
return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
except ValueError:
pass
# fallback: dirname YYYYMMDD-HHMMSSZ__ID
try:
prefix = dirname.split("__", 1)[0]
return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
except Exception:
return datetime.fromtimestamp(0, tz=timezone.utc)
def _apply_base_protection(
snapshots: List[Snapshot],
keep: set[str],
reasons: Dict[str, List[str]],
) -> Tuple[set[str], Dict[str, List[str]]]:
"""
Optional policy: if a kept snapshot has a base (kind+dirname), also keep that base snapshot.
This is NOT required for hardlink snapshots to remain readable, but can be useful
for performance (better base selection) or "chain" readability.
Adds reason: "base-of:<child_dirname>"
"""
# Index snapshots by (kind, dirname)
idx: Dict[Tuple[str, str], Snapshot] = {(s.kind, s.dirname): s for s in snapshots}
changed = True
while changed:
changed = False
# Iterate over a stable list of current keep items
for child_dirname in list(keep):
# Find the child snapshot (may exist in multiple kinds; check both)
child: Optional[Snapshot] = None
for k in ("scheduled", "manual"):
child = idx.get((k, child_dirname))
if child is not None:
break
if child is None:
continue
base = child.base
if not isinstance(base, dict):
continue
base_kind = base.get("kind")
base_dirname = base.get("dirname")
if not isinstance(base_kind, str) or not isinstance(base_dirname, str):
continue
base_snap = idx.get((base_kind, base_dirname))
if base_snap is None:
# Base might have been pruned already or never existed; ignore.
continue
if base_dirname not in keep:
keep.add(base_dirname)
reasons.setdefault(base_dirname, []).append(f"base-of:{child_dirname}")
changed = True
return keep, reasons
def run_retention_plan(prefix: Path, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
paths = PobsyncPaths(home=prefix)
global_cfg = load_global_config(paths.global_config_path)
host_cfg = load_host_config(paths.hosts_dir / f"{host}.yaml")
cfg = build_effective_config(global_cfg, host_cfg)
retention = cfg.get("retention")
if not isinstance(retention, dict):
raise ConfigError("No retention config found")
backup_root = cfg.get("backup_root")
if not isinstance(backup_root, str) or not backup_root.startswith("/"):
raise ConfigError("Invalid backup_root in config")
host_root = resolve_host_root(backup_root, host)
kinds: List[str]
if kind == "all":
kinds = ["scheduled", "manual"]
else:
kinds = [kind]
snapshots: List[Snapshot] = []
for kk in kinds:
for d in iter_snapshot_dirs(host_root, kk):
meta = read_snapshot_meta(d)
dt = _parse_snapshot_dt(d.name, meta)
snapshots.append(
Snapshot(
kind=kk,
dirname=d.name,
path=str(d),
dt=dt,
status=meta.get("status"),
base=meta.get("base"),
)
)
plan = build_retention_plan(
snapshots=snapshots,
retention=retention,
now=datetime.now(timezone.utc),
)
keep = set(plan.keep)
reasons = dict(plan.reasons)
if protect_bases:
keep, reasons = _apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [s for s in snapshots if s.dirname not in keep]
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"retention": retention,
"keep": sorted(keep),
"delete": [
{
"dirname": s.dirname,
"kind": s.kind,
"path": s.path,
"dt": s.dt.isoformat(),
"status": s.status,
}
for s in delete
],
"reasons": reasons,
}

View File

@@ -18,6 +18,7 @@ from ..snapshot import (
snapshot_dir_name, snapshot_dir_name,
utc_now, utc_now,
) )
from ..snapshot_meta import read_snapshot_meta
from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_atomic from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_atomic
@@ -54,7 +55,13 @@ def select_scheduled_base(dirs: HostBackupDirs) -> Path | None:
return _find_latest_snapshot(dirs.manual) return _find_latest_snapshot(dirs.manual)
def _base_meta_from_path(base_dir: Path | None) -> dict[str, Any] | None: def _base_meta_from_path(base_dir: Path | None, link_dest: str | None) -> dict[str, Any] | None:
"""
Build base metadata for meta.yaml.
Important: link_dest is the actual rsync --link-dest directory.
For our snapshot layout, that must be "<snapshot_dir>/data".
"""
if base_dir is None: if base_dir is None:
return None return None
@@ -63,15 +70,19 @@ def _base_meta_from_path(base_dir: Path | None) -> dict[str, Any] | None:
# Should not happen with current selection logic, but keep meta robust. # Should not happen with current selection logic, but keep meta robust.
kind = "unknown" kind = "unknown"
base_meta = read_snapshot_meta(base_dir)
base_id = base_meta.get("id") if isinstance(base_meta.get("id"), str) else None
return { return {
"kind": kind, "kind": kind,
"dirname": base_dir.name, "dirname": base_dir.name,
"id": None, "id": base_id,
"path": None, "path": link_dest,
} }
def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]: def run_scheduled(prefix: Path, host: str, dry_run: bool, prune: bool = False, prune_max_delete: int | None = None, prune_protect_bases: bool = False, ) -> dict[str, Any]:
host = sanitize_host(host) host = sanitize_host(host)
paths = PobsyncPaths(home=prefix) paths = PobsyncPaths(home=prefix)
@@ -92,7 +103,10 @@ def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
# Base snapshot (absolute path) # Base snapshot (absolute path)
base_dir = select_scheduled_base(dirs) base_dir = select_scheduled_base(dirs)
link_dest = str(base_dir) if base_dir else None
# BUGFIX: rsync --link-dest must point at the snapshot "data" root, not the snapshot dir itself.
# Our destination root is "<incomplete>/data/", so the base root must be "<base>/data/".
link_dest = str(base_dir / "data") if base_dir else None
ssh_cfg = cfg.get("ssh", {}) or {} ssh_cfg = cfg.get("ssh", {}) or {}
rsync_cfg = cfg.get("rsync", {}) or {} rsync_cfg = cfg.get("rsync", {}) or {}
@@ -202,7 +216,7 @@ def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
"started_at": format_iso_z(ts), "started_at": format_iso_z(ts),
"ended_at": None, "ended_at": None,
"duration_seconds": None, "duration_seconds": None,
"base": _base_meta_from_path(base_dir), "base": _base_meta_from_path(base_dir, link_dest),
"rsync": {"exit_code": None, "command": cmd, "stats": {}}, "rsync": {"exit_code": None, "command": cmd, "stats": {}},
# Keep existing fields for future expansion / compatibility with current structure. # Keep existing fields for future expansion / compatibility with current structure.
"overrides": {"includes": [], "excludes": [], "base": None}, "overrides": {"includes": [], "excludes": [], "base": None},

View File

@@ -0,0 +1,162 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Optional
from ..errors import ConfigError
from ..schedule import (
build_cron_expr_daily,
build_cron_expr_hourly,
build_cron_expr_monthly,
build_cron_expr_weekly,
normalize_cron_expr,
render_host_block,
upsert_host_block,
validate_cron_expr,
)
from ..util import ensure_dir, sanitize_host, write_text_atomic
def _choose_cron_expr(
*,
cron_expr: Optional[str],
daily: Optional[str],
hourly: Optional[int],
weekly: bool,
dow: Optional[str],
time: Optional[str],
monthly: bool,
day: Optional[int],
) -> str:
modes = [
("cron", cron_expr is not None),
("daily", daily is not None),
("hourly", hourly is not None),
("weekly", bool(weekly)),
("monthly", bool(monthly)),
]
chosen = [name for name, enabled in modes if enabled]
if len(chosen) == 0:
raise ConfigError("One of --cron/--daily/--hourly/--weekly/--monthly must be provided")
if len(chosen) > 1:
raise ConfigError("Choose exactly one of --cron/--daily/--hourly/--weekly/--monthly")
if cron_expr is not None:
validate_cron_expr(cron_expr)
return normalize_cron_expr(cron_expr)
if daily is not None:
return build_cron_expr_daily(daily)
if hourly is not None:
return build_cron_expr_hourly(hourly)
if weekly:
if dow is None or time is None:
raise ConfigError("--weekly requires --dow and --time")
return build_cron_expr_weekly(dow, time)
# monthly
if day is None or time is None:
raise ConfigError("--monthly requires --day and --time")
return build_cron_expr_monthly(day, time)
def run_schedule_create(
*,
host: str,
prefix: Path,
cron_file: Path,
cron_expr: Optional[str],
daily: Optional[str],
hourly: Optional[int],
weekly: bool,
dow: Optional[str],
time: Optional[str],
monthly: bool,
day: Optional[int],
user: str,
prune: bool,
prune_max_delete: int,
prune_protect_bases: bool,
dry_run: bool,
) -> dict[str, Any]:
host = sanitize_host(host)
if prune_max_delete < 0:
raise ConfigError("--prune-max-delete must be >= 0")
expr = _choose_cron_expr(
cron_expr=cron_expr,
daily=daily,
hourly=hourly,
weekly=weekly,
dow=dow,
time=time,
monthly=monthly,
day=day,
)
cmd = f"{prefix}/bin/pobsync --prefix {prefix} run-scheduled {host}"
if prune:
cmd += " --prune"
cmd += f" --prune-max-delete {int(prune_max_delete)}"
if prune_protect_bases:
cmd += " --prune-protect-bases"
log_dir = Path("/var/log/pobsync")
log_path = str(log_dir / f"{host}.cron.log")
block = render_host_block(
host=host,
cron_expr=expr,
user=user,
command=cmd,
log_path=log_path,
include_env=True,
)
try:
existing = cron_file.read_text(encoding="utf-8")
except FileNotFoundError:
existing = ""
except PermissionError as e:
raise ConfigError(f"Permission denied reading {cron_file}: {e}") from e
except OSError as e:
raise ConfigError(f"Failed reading {cron_file}: {e}") from e
had_block = f"# BEGIN POBSYNC host={host}" in existing
new_content = upsert_host_block(existing, host, block)
action_word = "updated" if had_block else "created"
actions = [
f"schedule {action_word} host={host}",
f"file {cron_file}",
f"cron {expr}",
f"user {user}",
]
if prune:
actions.append(f"prune enabled (max_delete={int(prune_max_delete)})")
if prune_protect_bases:
actions.append("prune protect_bases enabled")
if dry_run:
actions.append("dry-run (no file written)")
return {"ok": True, "actions": actions, "host": host, "cron_file": str(cron_file)}
# Best-effort ensure log dir exists
try:
ensure_dir(log_dir)
except Exception:
actions.append(f"warn: could not create {log_dir}")
try:
write_text_atomic(cron_file, new_content)
except PermissionError as e:
raise ConfigError(f"Permission denied writing {cron_file}: {e}") from e
except OSError as e:
raise ConfigError(f"Failed writing {cron_file}: {e}") from e
return {"ok": True, "actions": actions, "host": host, "cron_file": str(cron_file)}

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..errors import ConfigError
from ..schedule import parse_cron_file
from ..util import sanitize_host
def _parse_prune_flags(command: Optional[str]) -> Dict[str, Any]:
"""
Best-effort parse of flags from the command string that we generate.
"""
if not command:
return {"prune": False, "prune_max_delete": None, "prune_protect_bases": False}
tokens = command.split()
prune = "--prune" in tokens
protect = "--prune-protect-bases" in tokens
max_delete = None
if "--prune-max-delete" in tokens:
try:
idx = tokens.index("--prune-max-delete")
if idx + 1 < len(tokens):
max_delete = int(tokens[idx + 1])
except (ValueError, IndexError):
max_delete = None
return {
"prune": bool(prune),
"prune_max_delete": max_delete,
"prune_protect_bases": bool(protect),
}
def run_schedule_list(*, cron_file: Path, host: Optional[str]) -> dict[str, Any]:
if host is not None:
host = sanitize_host(host)
try:
content = cron_file.read_text(encoding="utf-8")
except FileNotFoundError:
content = ""
except PermissionError as e:
raise ConfigError(f"Permission denied reading {cron_file}: {e}") from e
except OSError as e:
raise ConfigError(f"Failed reading {cron_file}: {e}") from e
blocks = parse_cron_file(content)
schedules: List[Dict[str, Any]] = []
if host is not None:
b = blocks.get(host)
if b is None:
return {"ok": True, "cron_file": str(cron_file), "schedules": []}
flags = _parse_prune_flags(b.command)
schedules.append(
{
"host": b.host,
"cron": b.cron_expr,
"user": b.user,
"command": b.command,
"log_path": b.log_path,
**flags,
}
)
return {"ok": True, "cron_file": str(cron_file), "schedules": schedules}
for h in sorted(blocks.keys()):
b = blocks[h]
flags = _parse_prune_flags(b.command)
schedules.append(
{
"host": b.host,
"cron": b.cron_expr,
"user": b.user,
"command": b.command,
"log_path": b.log_path,
**flags,
}
)
return {"ok": True, "cron_file": str(cron_file), "schedules": schedules}

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from ..errors import ConfigError
from ..schedule import remove_host_block
from ..util import sanitize_host, write_text_atomic
def run_schedule_remove(*, host: str, cron_file: Path, dry_run: bool) -> dict[str, Any]:
host = sanitize_host(host)
try:
existing = cron_file.read_text(encoding="utf-8")
except FileNotFoundError:
existing = ""
except PermissionError as e:
raise ConfigError(f"Permission denied reading {cron_file}: {e}") from e
except OSError as e:
raise ConfigError(f"Failed reading {cron_file}: {e}") from e
new_content = remove_host_block(existing, host)
actions = [f"schedule remove host={host}", f"file {cron_file}"]
if dry_run:
actions.append("dry-run (no file written)")
return {"ok": True, "actions": actions, "host": host, "cron_file": str(cron_file)}
try:
write_text_atomic(cron_file, new_content)
except PermissionError as e:
raise ConfigError(f"Permission denied writing {cron_file}: {e}") from e
except OSError as e:
raise ConfigError(f"Failed writing {cron_file}: {e}") from e
return {"ok": True, "actions": actions, "host": host, "cron_file": str(cron_file)}

View File

@@ -0,0 +1,228 @@
from __future__ import annotations
import os
import shutil
import stat
import subprocess
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
CRON_FILE_DEFAULT = "/etc/cron.d/pobsync"
LOG_DIR_DEFAULT = "/var/log/pobsync"
@dataclass(frozen=True)
class DoctorCheck:
name: str
ok: bool
severity: str # "error" | "warning" | "info"
message: str
details: Optional[Dict[str, Any]] = None
def _run(cmd: List[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _check_cron_service() -> DoctorCheck:
systemctl = shutil.which("systemctl")
if not systemctl:
return DoctorCheck(
name="schedule.cron_service",
ok=True,
severity="warning",
message="systemctl not found; cannot verify cron service status",
details={"hint": "If cron isn't running, schedules won't execute."},
)
# Try both common service names
for svc in ("cron", "crond"):
cp = _run([systemctl, "is-active", svc])
if cp.returncode == 0 and cp.stdout.strip() == "active":
return DoctorCheck(
name="schedule.cron_service",
ok=True,
severity="info",
message=f"cron service is active ({svc})",
)
# Not active / unknown
return DoctorCheck(
name="schedule.cron_service",
ok=False,
severity="error",
message="cron service is not active (tried: cron, crond)",
details={"hint": "Enable/start cron (systemctl enable --now cron) or the equivalent on your distro."},
)
def _check_cron_file_permissions(cron_file: str) -> DoctorCheck:
try:
st = os.stat(cron_file)
except FileNotFoundError:
return DoctorCheck(
name="schedule.cron_file",
ok=True,
severity="warning",
message=f"cron file not found: {cron_file}",
details={"hint": "Create one via: pobsync schedule create <host> ..."},
)
except OSError as e:
return DoctorCheck(
name="schedule.cron_file",
ok=False,
severity="error",
message=f"cannot stat cron file: {cron_file}",
details={"error": str(e)},
)
if not stat.S_ISREG(st.st_mode):
return DoctorCheck(
name="schedule.cron_file",
ok=False,
severity="error",
message=f"cron file is not a regular file: {cron_file}",
)
problems: List[str] = []
if st.st_uid != 0:
problems.append("owner is not root")
# For /etc/cron.d, file must NOT be group/other writable.
# (Mode may be 600 or 644; both are fine as long as not writable by group/other.)
if (st.st_mode & 0o022) != 0:
problems.append("cron file is writable by group/other (must not be)")
mode_octal = oct(st.st_mode & 0o777)
if problems:
return DoctorCheck(
name="schedule.cron_file",
ok=False,
severity="error",
message=f"cron file permissions/ownership look unsafe: {cron_file}",
details={"mode": mode_octal, "uid": st.st_uid, "problems": problems},
)
return DoctorCheck(
name="schedule.cron_file",
ok=True,
severity="info",
message=f"cron file permissions/ownership OK: {cron_file}",
details={"mode": mode_octal},
)
def _check_log_dir(log_dir: str) -> DoctorCheck:
if not os.path.exists(log_dir):
return DoctorCheck(
name="schedule.log_dir",
ok=True,
severity="warning",
message=f"log directory does not exist: {log_dir}",
details={"hint": "Not fatal, but cron output redirection may fail. Backlog item: create in install."},
)
if not os.path.isdir(log_dir):
return DoctorCheck(
name="schedule.log_dir",
ok=False,
severity="error",
message=f"log path exists but is not a directory: {log_dir}",
)
if not os.access(log_dir, os.W_OK):
return DoctorCheck(
name="schedule.log_dir",
ok=False,
severity="error",
message=f"log directory is not writable: {log_dir}",
)
return DoctorCheck(
name="schedule.log_dir",
ok=True,
severity="info",
message=f"log directory OK: {log_dir}",
)
def _check_pobsync_executable(prefix: str) -> DoctorCheck:
exe = os.path.join(prefix, "bin", "pobsync")
if not os.path.exists(exe):
return DoctorCheck(
name="schedule.pobsync_executable",
ok=False,
severity="error",
message=f"pobsync executable not found at {exe}",
details={"hint": "Your cron entry likely points here; verify /opt/pobsync installation."},
)
if not os.access(exe, os.X_OK):
return DoctorCheck(
name="schedule.pobsync_executable",
ok=False,
severity="error",
message=f"pobsync exists but is not executable: {exe}",
)
return DoctorCheck(
name="schedule.pobsync_executable",
ok=True,
severity="info",
message=f"pobsync executable OK: {exe}",
)
def scheduling_checks(prefix: str, cron_file: str = CRON_FILE_DEFAULT) -> List[DoctorCheck]:
return [
_check_cron_service(),
_check_cron_file_permissions(cron_file),
_check_log_dir(LOG_DIR_DEFAULT),
_check_pobsync_executable(prefix),
]
def extend_doctor_result(result: Dict[str, Any], *, prefix: str, cron_file: str = CRON_FILE_DEFAULT) -> Dict[str, Any]:
"""
Add scheduling-related checks into an existing doctor result dict.
This is designed to be additive and low-risk:
- If result has a "checks" list, we append items.
- If result has "ok", we AND it with any error-level failures.
"""
checks = scheduling_checks(prefix=prefix, cron_file=cron_file)
# Normalize result structure
existing = result.get("checks")
if not isinstance(existing, list):
existing = []
result["checks"] = existing
for c in checks:
existing.append(
{
"name": c.name,
"ok": c.ok,
"severity": c.severity,
"message": c.message,
"details": c.details or {},
}
)
# Update overall ok: errors make it false; warnings do not.
overall_ok = bool(result.get("ok", True))
for c in checks:
if c.severity == "error" and not c.ok:
overall_ok = False
result["ok"] = overall_ok
return result

125
src/pobsync/retention.py Normal file
View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
@dataclass(frozen=True)
class Snapshot:
kind: str # scheduled | manual
dirname: str
path: str
dt: datetime # UTC
status: Optional[str]
base: Optional[dict]
@dataclass
class RetentionResult:
keep: Set[str] # dirnames
reasons: Dict[str, List[str]]
def _bucket_day(dt: datetime) -> str:
return dt.strftime("%Y-%m-%d")
def _bucket_week(dt: datetime) -> str:
iso = dt.isocalendar()
return f"{iso.year}-W{iso.week:02d}"
def _bucket_month(dt: datetime) -> str:
return dt.strftime("%Y-%m")
def _bucket_year(dt: datetime) -> str:
return dt.strftime("%Y")
def _window_start(now: datetime, unit: str, count: int) -> datetime:
if count <= 0:
return now + timedelta(days=1)
if unit == "daily":
return (now - timedelta(days=count - 1)).replace(hour=0, minute=0, second=0, microsecond=0)
if unit == "weekly":
return now - timedelta(weeks=count - 1)
if unit == "monthly":
return now.replace(day=1) - timedelta(days=32 * (count - 1))
if unit == "yearly":
return now.replace(month=1, day=1) - timedelta(days=366 * (count - 1))
raise ValueError(unit)
def build_retention_plan(
snapshots: Iterable[Snapshot],
retention: Dict[str, int],
now: Optional[datetime] = None,
) -> RetentionResult:
"""
Build a dry-run retention plan.
Returns:
- keep: set of snapshot dirnames to keep
- reasons: mapping dirname -> list of reasons why it is kept
"""
if now is None:
now = datetime.now(timezone.utc)
snaps = sorted(snapshots, key=lambda s: s.dt, reverse=True)
keep: Set[str] = set()
reasons: Dict[str, List[str]] = {}
def mark(dirname: str, reason: str) -> None:
keep.add(dirname)
reasons.setdefault(dirname, []).append(reason)
# Always keep newest snapshot overall (if any)
if snaps:
mark(snaps[0].dirname, "newest")
# Retention buckets
rules = [
("daily", retention.get("daily", 0), _bucket_day),
("weekly", retention.get("weekly", 0), _bucket_week),
("monthly", retention.get("monthly", 0), _bucket_month),
("yearly", retention.get("yearly", 0), _bucket_year),
]
for name, count, bucket_fn in rules:
if count <= 0:
continue
window_start = _window_start(now, name, count)
seen: Set[str] = set()
for s in snaps:
if s.dt < window_start:
break
bucket = bucket_fn(s.dt)
if bucket in seen:
continue
# Prefer successful snapshots, but allow fallback
if s.status not in (None, "success"):
continue
seen.add(bucket)
mark(s.dirname, f"{name}:{bucket}")
# Fallback: if a bucket had no success, allow newest non-success
for s in snaps:
if s.dt < window_start:
break
bucket = bucket_fn(s.dt)
if bucket in seen:
continue
seen.add(bucket)
mark(s.dirname, f"{name}:{bucket}:fallback")
return RetentionResult(keep=keep, reasons=reasons)

235
src/pobsync/schedule.py Normal file
View File

@@ -0,0 +1,235 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
CRON_FILE_DEFAULT = "/etc/cron.d/pobsync"
BEGIN_PREFIX = "# BEGIN POBSYNC host="
END_PREFIX = "# END POBSYNC host="
@dataclass(frozen=True)
class ScheduleBlock:
host: str
raw_lines: List[str] # full block including begin/end markers
cron_expr: Optional[str] # "m h dom mon dow"
user: Optional[str]
command: Optional[str]
log_path: Optional[str]
def normalize_cron_expr(expr: str) -> str:
return " ".join(expr.strip().split())
def validate_cron_expr(expr: str) -> None:
parts = normalize_cron_expr(expr).split(" ")
if len(parts) != 5:
raise ValueError("cron expression must have exactly 5 fields (m h dom mon dow)")
def parse_hhmm(s: str) -> Tuple[int, int]:
s = s.strip()
if ":" not in s:
raise ValueError("time must be HH:MM")
a, b = s.split(":", 1)
if not a.isdigit() or not b.isdigit():
raise ValueError("time must be HH:MM")
h = int(a)
m = int(b)
if h < 0 or h > 23:
raise ValueError("hour must be 0..23")
if m < 0 or m > 59:
raise ValueError("minute must be 0..59")
return h, m
def parse_dow(s: str) -> int:
"""
Accept: mon,tue,wed,thu,fri,sat,sun (case-insensitive)
Return cron day-of-week number: 0=sun, 1=mon, ... 6=sat
"""
x = s.strip().lower()
mapping = {
"sun": 0,
"mon": 1,
"tue": 2,
"wed": 3,
"thu": 4,
"fri": 5,
"sat": 6,
}
if x not in mapping:
raise ValueError("dow must be one of: mon,tue,wed,thu,fri,sat,sun")
return mapping[x]
def build_cron_expr_daily(hhmm: str) -> str:
h, m = parse_hhmm(hhmm)
return f"{m} {h} * * *"
def build_cron_expr_hourly(minute: int = 0) -> str:
if minute < 0 or minute > 59:
raise ValueError("minute must be 0..59")
return f"{minute} * * * *"
def build_cron_expr_weekly(dow: str, hhmm: str) -> str:
h, m = parse_hhmm(hhmm)
dow_num = parse_dow(dow)
return f"{m} {h} * * {dow_num}"
def build_cron_expr_monthly(day: int, hhmm: str) -> str:
if day < 1 or day > 31:
raise ValueError("day must be 1..31")
h, m = parse_hhmm(hhmm)
return f"{m} {h} {day} * *"
def render_host_block(
host: str,
cron_expr: str,
user: str,
command: str,
log_path: Optional[str],
include_env: bool = True,
) -> str:
validate_cron_expr(cron_expr)
cron_expr = normalize_cron_expr(cron_expr)
lines: List[str] = []
lines.append(f"{BEGIN_PREFIX}{host}")
lines.append("# managed-by=pobsync")
if include_env:
lines.append("SHELL=/bin/sh")
lines.append("PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
cron_line = f"{cron_expr} {user} {command}"
if log_path:
cron_line += f" >>{log_path} 2>&1"
lines.append(cron_line)
lines.append(f"{END_PREFIX}{host}")
return "\n".join(lines) + "\n"
def parse_cron_file(content: str) -> Dict[str, ScheduleBlock]:
blocks: Dict[str, ScheduleBlock] = {}
lines = content.splitlines()
i = 0
while i < len(lines):
line = lines[i]
if line.startswith(BEGIN_PREFIX):
host = line[len(BEGIN_PREFIX) :].strip()
block_lines = [line]
i += 1
while i < len(lines):
block_lines.append(lines[i])
if lines[i].strip() == f"{END_PREFIX}{host}":
break
i += 1
cron_expr, user, command, log_path = _extract_cron_line(block_lines)
blocks[host] = ScheduleBlock(
host=host,
raw_lines=block_lines,
cron_expr=cron_expr,
user=user,
command=command,
log_path=log_path,
)
i += 1
return blocks
def _extract_cron_line(block_lines: List[str]) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
for raw in block_lines:
line = raw.strip()
if not line:
continue
if line.startswith("#"):
continue
# skip env-like lines
if "=" in line and line.split("=", 1)[0].isidentifier():
continue
parts = line.split()
if len(parts) < 7:
continue
cron_expr = " ".join(parts[0:5])
user = parts[5]
cmd = " ".join(parts[6:])
log_path = None
if ">>" in cmd:
before, after = cmd.split(">>", 1)
cmd = before.rstrip()
after_parts = after.strip().split()
if after_parts:
log_path = after_parts[0]
return cron_expr, user, cmd, log_path
return None, None, None, None
def upsert_host_block(content: str, host: str, new_block: str) -> str:
lines = content.splitlines()
out: List[str] = []
i = 0
replaced = False
begin = f"{BEGIN_PREFIX}{host}"
end = f"{END_PREFIX}{host}"
while i < len(lines):
if lines[i].strip() == begin:
replaced = True
# skip until end marker (inclusive)
i += 1
while i < len(lines) and lines[i].strip() != end:
i += 1
if i < len(lines):
i += 1 # skip end marker
out.extend(new_block.rstrip("\n").splitlines())
continue
out.append(lines[i])
i += 1
if not replaced:
if out and out[-1].strip() != "":
out.append("")
out.extend(new_block.rstrip("\n").splitlines())
return "\n".join(out).rstrip() + "\n"
def remove_host_block(content: str, host: str) -> str:
lines = content.splitlines()
out: List[str] = []
i = 0
begin = f"{BEGIN_PREFIX}{host}"
end = f"{END_PREFIX}{host}"
while i < len(lines):
if lines[i].strip() == begin:
i += 1
while i < len(lines) and lines[i].strip() != end:
i += 1
if i < len(lines):
i += 1 # skip end marker
continue
out.append(lines[i])
i += 1
return "\n".join(out).rstrip() + "\n"

View File

@@ -119,3 +119,57 @@ def write_yaml_atomic(path: Path, data: Any) -> None:
except OSError: except OSError:
pass pass
def write_text_atomic(path: Path, content: str) -> None:
"""
Write text to `path` atomically.
Strategy:
- Write to a temp file in the same directory
- fsync temp file
- os.replace(temp, path) (atomic on POSIX)
- fsync directory entry (best-effort)
This helps avoid partial/corrupt files on crashes.
"""
parent = path.parent
parent.mkdir(parents=True, exist_ok=True)
tmp_fd: int | None = None
tmp_path: Path | None = None
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=str(parent),
prefix=path.name + ".",
suffix=".tmp",
delete=False,
) as tf:
tmp_fd = tf.fileno()
tmp_path = Path(tf.name)
tf.write(content)
tf.flush()
os.fsync(tmp_fd)
os.replace(str(tmp_path), str(path))
# Best-effort directory fsync (helps durability across power loss on some FS)
try:
dir_fd = os.open(str(parent), os.O_DIRECTORY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except OSError:
pass
finally:
# If anything failed before replace(), try to clean up temp file
if tmp_path is not None and tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass