(feature) Add global and effective host config checks

Introduce reusable configuration checks for global settings and effective
host runtime configuration. The checks now surface risky backup settings
such as missing recursive rsync args, missing critical root excludes,
invalid SSH settings, missing credentials, and retention gaps.

Show these checks on the global config form, host edit form, and host
detail page so operators can validate the compounded host/global config
before starting real backup runs.
This commit is contained in:
2026-05-19 20:24:29 +02:00
parent 7e5d31d53b
commit 088f43279e
6 changed files with 336 additions and 25 deletions

View File

@@ -0,0 +1,201 @@
from __future__ import annotations
import os
import shutil
from pathlib import Path
from django.conf import settings
from .models import GlobalConfig, HostConfig, SshCredential
from .self_check import SelfCheck
from .ssh_keys import identity_path
CRITICAL_ROOT_EXCLUDES = ("/proc/***", "/sys/***", "/dev/***", "/run/***", "/tmp/***")
def collect_global_config_checks(global_config: GlobalConfig) -> list[SelfCheck]:
checks = [
_absolute_path_check("Global backup root", global_config.backup_root),
_absolute_path_check("Global pobsync home", global_config.pobsync_home),
_runtime_backup_root_check(global_config),
_rsync_binary_check(global_config.rsync_binary),
_rsync_recursion_check(
"Global rsync recursion",
[*list(global_config.rsync_args or []), *list(global_config.rsync_extra_args or [])],
),
_source_root_check("Global source root", global_config.default_source_root),
_root_excludes_check(global_config.default_source_root, list(global_config.excludes_default or [])),
_retention_check(
"Global retention",
global_config.retention_daily,
global_config.retention_weekly,
global_config.retention_monthly,
global_config.retention_yearly,
),
_ssh_port_check("Global SSH port", global_config.ssh_port),
_credential_check("Global SSH credential", global_config.default_ssh_credential),
]
return checks
def collect_effective_host_config_checks(host: HostConfig, global_config: GlobalConfig) -> list[SelfCheck]:
source_root = host.source_root or global_config.default_source_root
ssh_user = host.ssh_user or global_config.ssh_user
ssh_port = host.ssh_port or global_config.ssh_port
credential = host.ssh_credential or global_config.default_ssh_credential
if host.excludes_replace is not None:
excludes = list(host.excludes_replace)
else:
excludes = [*list(global_config.excludes_default or []), *list(host.excludes_add or [])]
rsync_args = [
*list(global_config.rsync_args or []),
*list(global_config.rsync_extra_args or []),
*list(host.rsync_extra_args or []),
]
checks = [
_source_root_check("Host effective source root", source_root),
_ssh_user_check(ssh_user),
_ssh_port_check("Host effective SSH port", ssh_port),
_credential_check("Host effective SSH credential", credential),
_rsync_recursion_check("Host effective rsync recursion", rsync_args),
_root_excludes_check(source_root, excludes, host=host),
_includes_check(host),
_retention_check(
"Host retention",
host.retention_daily,
host.retention_weekly,
host.retention_monthly,
host.retention_yearly,
),
]
return checks
def has_recursive_rsync_arg(args: list[str]) -> bool:
for arg in args:
if arg in {"--archive", "--recursive"}:
return True
if arg.startswith("-") and not arg.startswith("--") and any(flag in arg for flag in ("a", "r")):
return True
return False
def _absolute_path_check(name: str, value: str) -> SelfCheck:
path = Path(value)
if not value:
return SelfCheck(name, "failed", "Path is empty.")
if not path.is_absolute():
return SelfCheck(name, "failed", f"{value} is not absolute.")
return SelfCheck(name, "ok", value)
def _runtime_backup_root_check(global_config: GlobalConfig) -> SelfCheck:
if global_config.backup_root == settings.POBSYNC_BACKUP_ROOT:
return SelfCheck("Runtime backup root", "ok", global_config.backup_root)
return SelfCheck(
"Runtime backup root",
"warning",
"Database backup root differs from runtime POBSYNC_BACKUP_ROOT.",
f"database={global_config.backup_root} runtime={settings.POBSYNC_BACKUP_ROOT}",
)
def _rsync_binary_check(binary: str) -> SelfCheck:
if not binary:
return SelfCheck("Global rsync binary", "failed", "Rsync binary is empty.")
if Path(binary).is_absolute():
exists = Path(binary).exists()
message = binary if exists else f"{binary} does not exist."
return SelfCheck("Global rsync binary", "ok" if exists else "failed", message)
path = shutil.which(binary)
return SelfCheck("Global rsync binary", "ok" if path else "failed", path or f"{binary} was not found in PATH.")
def _rsync_recursion_check(name: str, args: list[str]) -> SelfCheck:
if has_recursive_rsync_arg(args):
return SelfCheck(name, "ok", "Rsync args include archive or recursive transfer.", " ".join(args))
return SelfCheck(
name,
"failed",
"Rsync args do not include archive or recursive transfer.",
"Add --archive or --recursive before running a real backup.",
)
def _source_root_check(name: str, source_root: str) -> SelfCheck:
if not source_root:
return SelfCheck(name, "failed", "Source root is empty.")
if not source_root.startswith("/"):
return SelfCheck(name, "failed", f"{source_root} is not absolute.")
return SelfCheck(name, "ok", source_root)
def _root_excludes_check(source_root: str, excludes: list[str], host: HostConfig | None = None) -> SelfCheck:
if source_root != "/":
return SelfCheck("Effective root excludes", "ok", "Source root is not /, critical OS excludes are less important.")
missing = [pattern for pattern in CRITICAL_ROOT_EXCLUDES if pattern not in excludes]
if missing:
detail = ", ".join(missing)
if host and host.excludes_replace is not None:
detail = f"excludes_replace is active; missing {detail}"
return SelfCheck(
"Effective root excludes",
"warning",
"Source root is / but some critical default excludes are missing.",
detail,
)
return SelfCheck("Effective root excludes", "ok", "Critical root excludes are present.")
def _includes_check(host: HostConfig) -> SelfCheck:
includes = list(host.includes or [])
if not includes:
return SelfCheck("Host includes", "ok", "No host include rules are configured.")
return SelfCheck(
"Host includes",
"warning",
"Includes are passed to rsync as raw --include rules.",
"Verify matching exclude rules if you intend to limit the backup scope.",
)
def _retention_check(name: str, daily: int, weekly: int, monthly: int, yearly: int) -> SelfCheck:
if any(value > 0 for value in (daily, weekly, monthly, yearly)):
return SelfCheck(name, "ok", f"d{daily} w{weekly} m{monthly} y{yearly}")
return SelfCheck(name, "warning", "All retention windows are zero.")
def _ssh_user_check(user: str) -> SelfCheck:
if user.strip():
return SelfCheck("Host effective SSH user", "ok", user.strip())
return SelfCheck("Host effective SSH user", "failed", "SSH user is empty.")
def _ssh_port_check(name: str, port: int | None) -> SelfCheck:
if port is None:
return SelfCheck(name, "failed", "SSH port is empty.")
if 1 <= int(port) <= 65535:
return SelfCheck(name, "ok", str(port))
return SelfCheck(name, "failed", f"{port} is outside the valid TCP port range.")
def _credential_check(name: str, credential: SshCredential | None) -> SelfCheck:
if credential is None:
return SelfCheck(name, "warning", "No SSH credential selected.")
if credential.key_path:
key_path = identity_path(credential)
if not key_path.exists():
return SelfCheck(name, "failed", f"{key_path} does not exist.")
if not os.access(key_path, os.R_OK):
return SelfCheck(name, "failed", f"{key_path} is not readable by this process.")
return SelfCheck(name, "ok", str(credential), str(key_path))
if credential.private_key:
return SelfCheck(
name,
"warning",
f"{credential} stores private key material in the database.",
"Generated filesystem keys are recommended for native installs.",
)
return SelfCheck(name, "failed", f"{credential} has no private key material or key path.")