from __future__ import annotations import os import shutil from pathlib import Path from django.conf import settings from .models import GlobalConfig, HostConfig, SshCredential from .self_check import SelfCheck from .ssh_keys import identity_path CRITICAL_ROOT_EXCLUDES = ("/proc/***", "/sys/***", "/dev/***", "/run/***", "/tmp/***") def collect_global_config_checks(global_config: GlobalConfig) -> list[SelfCheck]: checks = [ _absolute_path_check("Global backup root", global_config.backup_root), _absolute_path_check("Global pobsync home", global_config.pobsync_home), _runtime_backup_root_check(global_config), _rsync_binary_check(global_config.rsync_binary), _rsync_recursion_check( "Global rsync recursion", [*list(global_config.rsync_args or []), *list(global_config.rsync_extra_args or [])], ), _source_root_check("Global source root", global_config.default_source_root), _root_excludes_check(global_config.default_source_root, list(global_config.excludes_default or [])), _retention_check( "Global retention", global_config.retention_daily, global_config.retention_weekly, global_config.retention_monthly, global_config.retention_yearly, ), _ssh_port_check("Global SSH port", global_config.ssh_port), _credential_check("Global SSH credential", global_config.default_ssh_credential), ] return checks def collect_effective_host_config_checks(host: HostConfig, global_config: GlobalConfig) -> list[SelfCheck]: source_root = host.source_root or global_config.default_source_root ssh_user = host.ssh_user or global_config.ssh_user ssh_port = host.ssh_port or global_config.ssh_port credential = host.ssh_credential or global_config.default_ssh_credential if host.excludes_replace is not None: excludes = list(host.excludes_replace) else: excludes = [*list(global_config.excludes_default or []), *list(host.excludes_add or [])] rsync_args = [ *list(global_config.rsync_args or []), *list(global_config.rsync_extra_args or []), *list(host.rsync_extra_args or []), ] checks = [ _source_root_check("Host effective source root", source_root), _ssh_user_check(ssh_user), _ssh_port_check("Host effective SSH port", ssh_port), _credential_check("Host effective SSH credential", credential), _rsync_recursion_check("Host effective rsync recursion", rsync_args), _root_excludes_check(source_root, excludes, host=host), _includes_check(host), _retention_check( "Host retention", host.retention_daily, host.retention_weekly, host.retention_monthly, host.retention_yearly, ), ] return checks def has_recursive_rsync_arg(args: list[str]) -> bool: for arg in args: if arg in {"--archive", "--recursive"}: return True if arg.startswith("-") and not arg.startswith("--") and any(flag in arg for flag in ("a", "r")): return True return False def _absolute_path_check(name: str, value: str) -> SelfCheck: path = Path(value) if not value: return SelfCheck(name, "failed", "Path is empty.") if not path.is_absolute(): return SelfCheck(name, "failed", f"{value} is not absolute.") return SelfCheck(name, "ok", value) def _runtime_backup_root_check(global_config: GlobalConfig) -> SelfCheck: if global_config.backup_root == settings.POBSYNC_BACKUP_ROOT: return SelfCheck("Runtime backup root", "ok", global_config.backup_root) return SelfCheck( "Runtime backup root", "warning", "Database backup root differs from runtime POBSYNC_BACKUP_ROOT.", f"database={global_config.backup_root} runtime={settings.POBSYNC_BACKUP_ROOT}", ) def _rsync_binary_check(binary: str) -> SelfCheck: if not binary: return SelfCheck("Global rsync binary", "failed", "Rsync binary is empty.") if Path(binary).is_absolute(): exists = Path(binary).exists() message = binary if exists else f"{binary} does not exist." return SelfCheck("Global rsync binary", "ok" if exists else "failed", message) path = shutil.which(binary) return SelfCheck("Global rsync binary", "ok" if path else "failed", path or f"{binary} was not found in PATH.") def _rsync_recursion_check(name: str, args: list[str]) -> SelfCheck: if has_recursive_rsync_arg(args): return SelfCheck(name, "ok", "Rsync args include archive or recursive transfer.", " ".join(args)) return SelfCheck( name, "failed", "Rsync args do not include archive or recursive transfer.", "Add --archive or --recursive before running a real backup.", ) def _source_root_check(name: str, source_root: str) -> SelfCheck: if not source_root: return SelfCheck(name, "failed", "Source root is empty.") if not source_root.startswith("/"): return SelfCheck(name, "failed", f"{source_root} is not absolute.") return SelfCheck(name, "ok", source_root) def _root_excludes_check(source_root: str, excludes: list[str], host: HostConfig | None = None) -> SelfCheck: if source_root != "/": return SelfCheck("Effective root excludes", "ok", "Source root is not /, critical OS excludes are less important.") missing = [pattern for pattern in CRITICAL_ROOT_EXCLUDES if pattern not in excludes] if missing: detail = ", ".join(missing) if host and host.excludes_replace is not None: detail = f"excludes_replace is active; missing {detail}" return SelfCheck( "Effective root excludes", "warning", "Source root is / but some critical default excludes are missing.", detail, ) return SelfCheck("Effective root excludes", "ok", "Critical root excludes are present.") def _includes_check(host: HostConfig) -> SelfCheck: includes = list(host.includes or []) if not includes: return SelfCheck("Host includes", "ok", "No host include rules are configured.") return SelfCheck( "Host includes", "warning", "Includes are passed to rsync as raw --include rules.", "Verify matching exclude rules if you intend to limit the backup scope.", ) def _retention_check(name: str, daily: int, weekly: int, monthly: int, yearly: int) -> SelfCheck: if any(value > 0 for value in (daily, weekly, monthly, yearly)): return SelfCheck(name, "ok", f"d{daily} w{weekly} m{monthly} y{yearly}") return SelfCheck(name, "warning", "All retention windows are zero.") def _ssh_user_check(user: str) -> SelfCheck: if user.strip(): return SelfCheck("Host effective SSH user", "ok", user.strip()) return SelfCheck("Host effective SSH user", "failed", "SSH user is empty.") def _ssh_port_check(name: str, port: int | None) -> SelfCheck: if port is None: return SelfCheck(name, "failed", "SSH port is empty.") if 1 <= int(port) <= 65535: return SelfCheck(name, "ok", str(port)) return SelfCheck(name, "failed", f"{port} is outside the valid TCP port range.") def _credential_check(name: str, credential: SshCredential | None) -> SelfCheck: if credential is None: return SelfCheck(name, "warning", "No SSH credential selected.") if credential.key_path: key_path = identity_path(credential) if not key_path.exists(): return SelfCheck(name, "failed", f"{key_path} does not exist.") if not os.access(key_path, os.R_OK): return SelfCheck(name, "failed", f"{key_path} is not readable by this process.") return SelfCheck(name, "ok", str(credential), str(key_path)) if credential.private_key: return SelfCheck( name, "warning", f"{credential} stores private key material in the database.", "Generated filesystem keys are recommended for native installs.", ) return SelfCheck(name, "failed", f"{credential} has no private key material or key path.")