Files
pobsync/src/pobsync_backend/host_ops.py

70 lines
2.6 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
from pathlib import Path
from pobsync.snapshot_meta import resolve_host_root
from .models import GlobalConfig, HostConfig
from .self_check import SelfCheck
HOST_BACKUP_SUBDIRS = ("scheduled", "manual", ".incomplete")
def ensure_host_directories(host: HostConfig, global_config: GlobalConfig | None = None) -> Path:
global_config = global_config or GlobalConfig.objects.get(name="default")
host_root = resolve_host_root(global_config.backup_root, host.host)
for subdir in HOST_BACKUP_SUBDIRS:
(host_root / subdir).mkdir(parents=True, exist_ok=True)
return host_root
def collect_host_checks(host: HostConfig, global_config: GlobalConfig | None = None) -> list[SelfCheck]:
checks: list[SelfCheck] = []
try:
global_config = global_config or GlobalConfig.objects.get(name="default")
except GlobalConfig.DoesNotExist:
return [SelfCheck("Host global config", "failed", "Default global config does not exist.")]
checks.append(
SelfCheck(
"Host enabled",
"ok" if host.enabled else "warning",
"Host is enabled." if host.enabled else "Host is disabled.",
)
)
checks.append(
SelfCheck(
"Host address",
"ok" if host.address.strip() else "failed",
host.address.strip() or "Host address is empty.",
)
)
credential = host.ssh_credential or global_config.default_ssh_credential
checks.append(
SelfCheck(
"Host SSH credential",
"ok" if credential else "warning",
str(credential) if credential else "No host or global SSH credential selected.",
)
)
host_root = resolve_host_root(global_config.backup_root, host.host)
checks.append(_host_path_check("Host backup root", host_root, must_exist=True, must_be_writable=True))
for subdir in HOST_BACKUP_SUBDIRS:
checks.append(_host_path_check(f"Host directory: {subdir}", host_root / subdir, must_exist=True, must_be_writable=True))
return checks
def _host_path_check(name: str, path: Path, *, must_exist: bool, must_be_writable: bool) -> SelfCheck:
if must_exist and not path.exists():
return SelfCheck(name, "failed", f"{path} does not exist.")
target = path if path.exists() else path.parent
if not target.exists():
return SelfCheck(name, "failed", f"{target} does not exist.")
if must_be_writable and not os.access(target, os.W_OK):
return SelfCheck(name, "failed", f"{target} is not writable by this process.")
return SelfCheck(name, "ok", str(path))