Files
pobsync/src/pobsync_backend/preflight.py

233 lines
7.8 KiB
Python
Raw Normal View History

from __future__ import annotations
import shlex
import subprocess
from dataclasses import dataclass
from typing import Any
from pobsync.config.merge import build_effective_config
from pobsync.rsync import build_ssh_command
from .config_repository import global_config_object_data, host_config_object_data
from .config_source import DjangoConfigSource
from .host_ops import collect_host_checks
from .models import GlobalConfig, HostConfig
from .self_check import SelfCheck
DRY_RUN_BLOCKING_CHECKS = {
"Host global config",
"Host address",
"Host SSH key file",
"Host effective source root",
"Host effective SSH user",
"Host effective SSH port",
"Host effective SSH credential",
"Host effective rsync recursion",
}
@dataclass(frozen=True)
class BackupGate:
state: str
message: str
checks: list[SelfCheck]
real_blockers: list[SelfCheck]
dry_run_blockers: list[SelfCheck]
warnings: list[SelfCheck]
@property
def can_queue_real(self) -> bool:
return not self.real_blockers
@property
def can_queue_dry_run(self) -> bool:
return not self.dry_run_blockers
def collect_backup_gate(host: HostConfig, global_config: GlobalConfig | None = None) -> BackupGate:
checks = collect_host_checks(host, global_config)
remote_preflight_check = _remote_preflight_self_check(host)
if remote_preflight_check is not None:
checks.append(remote_preflight_check)
real_blockers = [check for check in checks if check.status == "failed"]
dry_run_blockers = [check for check in real_blockers if check.name in DRY_RUN_BLOCKING_CHECKS]
warnings = [check for check in checks if check.status == "warning"]
if real_blockers:
state = "blocked"
message = "Real backups are blocked until failed host checks are resolved."
elif warnings:
state = "warning"
message = "Backups can run, but review the warnings first."
else:
state = "ready"
message = "This host is ready for backup runs."
return BackupGate(
state=state,
message=message,
checks=checks,
real_blockers=real_blockers,
dry_run_blockers=dry_run_blockers,
warnings=warnings,
)
def run_remote_preflight(host: HostConfig, *, timeout_seconds: int = 20) -> dict[str, Any]:
config = DjangoConfigSource().effective_config_for_host(host.host)
ssh_cfg = config.get("ssh", {}) or {}
rsync_cfg = config.get("rsync", {}) or {}
address = str(config.get("address") or host.address)
user = str(ssh_cfg.get("user") or "root")
source_root = str(config.get("source_root") or (config.get("defaults", {}) or {}).get("source_root") or "/")
rsync_binary = str(rsync_cfg.get("binary") or "rsync")
target = f"{user}@{address}"
ssh_cmd = build_ssh_command(ssh_cfg)
checks = [
_run_remote_check(
name="SSH reachability",
command=[*ssh_cmd, "-oBatchMode=yes", target, "true"],
timeout_seconds=timeout_seconds,
),
_run_remote_check(
name="Remote rsync",
command=[
*ssh_cmd,
"-oBatchMode=yes",
target,
"sh",
"-lc",
f"command -v {shlex.quote(rsync_binary)} >/dev/null",
],
timeout_seconds=timeout_seconds,
),
_run_remote_check(
name="Remote source root",
command=[
*ssh_cmd,
"-oBatchMode=yes",
target,
"sh",
"-lc",
f"test -e {shlex.quote(source_root)} && test -r {shlex.quote(source_root)}",
],
timeout_seconds=timeout_seconds,
),
]
result = {
"ok": all(check["ok"] for check in checks),
"checks": checks,
"target": target,
"source_root": source_root,
"rsync_binary": rsync_binary,
"timeout_seconds": timeout_seconds,
}
host.config = {**(host.config or {}), "last_preflight": result}
host.save(update_fields=["config", "updated_at"])
return result
def effective_host_config_preview(host: HostConfig, global_config: GlobalConfig) -> dict[str, Any]:
config = build_effective_config(global_config_object_data(global_config), host_config_object_data(host))
credential = host.ssh_credential or global_config.default_ssh_credential
ssh = config.get("ssh", {}) or {}
rsync = config.get("rsync", {}) or {}
retention = config.get("retention", {}) or {}
return {
"source_root": config.get("source_root", ""),
"destination_subdir": (config.get("defaults", {}) or {}).get("destination_subdir", ""),
"includes": list(config.get("includes") or []),
"excludes": list(config.get("excludes_effective") or []),
"ssh": {
"user": ssh.get("user", ""),
"port": ssh.get("port", ""),
"options": list(ssh.get("options") or []),
"credential": str(credential) if credential else "",
},
"rsync": {
"binary": rsync.get("binary", ""),
"args": list(rsync.get("args_effective") or []),
"timeout_seconds": rsync.get("timeout_seconds", 0),
"bwlimit_kbps": rsync.get("bwlimit_kbps", 0),
},
"retention": {
"daily": retention.get("daily", 0),
"weekly": retention.get("weekly", 0),
"monthly": retention.get("monthly", 0),
"yearly": retention.get("yearly", 0),
},
}
def _run_remote_check(*, name: str, command: list[str], timeout_seconds: int) -> dict[str, Any]:
try:
result = subprocess.run(
command,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout_seconds,
)
except subprocess.TimeoutExpired as exc:
return {
"name": name,
"ok": False,
"exit_code": 124,
"message": f"{name} timed out after {timeout_seconds}s.",
"detail": _clip_output((exc.stderr or exc.stdout or "").strip()),
}
except OSError as exc:
return {
"name": name,
"ok": False,
"exit_code": None,
"message": f"{name} could not start.",
"detail": str(exc),
}
return {
"name": name,
"ok": result.returncode == 0,
"exit_code": result.returncode,
"message": f"{name} passed." if result.returncode == 0 else f"{name} failed.",
"detail": _clip_output((result.stderr or result.stdout or "").strip()),
}
def _remote_preflight_self_check(host: HostConfig) -> SelfCheck | None:
preflight = (host.config or {}).get("last_preflight")
if not isinstance(preflight, dict):
return SelfCheck(
"Remote preflight",
"warning",
"No remote connection preflight has been run yet.",
"Run connection preflight before the first real backup.",
)
checks = preflight.get("checks")
if not isinstance(checks, list):
return SelfCheck("Remote preflight", "failed", "Stored remote preflight result is invalid.")
failed = [str(check.get("name", "unknown")) for check in checks if isinstance(check, dict) and not check.get("ok")]
if failed:
return SelfCheck(
"Remote preflight",
"failed",
"Remote connection preflight failed.",
", ".join(failed),
)
return SelfCheck(
"Remote preflight",
"ok",
"Remote connection preflight passed.",
f"{preflight.get('target', '')} {preflight.get('source_root', '')}".strip(),
)
def _clip_output(value: str, *, max_chars: int = 800) -> str:
if len(value) <= max_chars:
return value
return f"{value[:max_chars]}..."