add new doctor commands

This commit is contained in:
2026-02-03 16:29:51 +01:00
parent 4827889f26
commit 795fe38888
2 changed files with 334 additions and 1 deletions

View File

@@ -2,14 +2,17 @@ from __future__ import annotations
import os
import shutil
import subprocess
from pathlib import Path
from typing import Any
from ..config.load import load_global_config, load_host_config
from ..errors import DoctorError
from ..paths import PobsyncPaths
from ..util import is_absolute_non_root
CRON_FILE_DEFAULT = Path("/etc/cron.d/pobsync")
LOG_DIR_DEFAULT = Path("/var/log/pobsync")
def _check_binary(name: str) -> tuple[bool, str]:
p = shutil.which(name)
@@ -32,6 +35,77 @@ def _check_writable_dir(path: Path) -> tuple[bool, str]:
return True, f"ok: writable {path}"
def _run(cmd: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _check_cron_service() -> tuple[bool, str]:
"""
Best-effort check: verify cron service is active on systemd hosts.
If systemctl is missing, we don't fail doctor phase 1.
"""
systemctl = shutil.which("systemctl")
if not systemctl:
return True, "ok: systemctl not found; cannot verify cron service status"
for svc in ("cron", "crond"):
cp = _run([systemctl, "is-active", svc])
if cp.returncode == 0 and cp.stdout.strip() == "active":
return True, f"ok: cron service active ({svc})"
return False, "cron service not active (tried: cron, crond)"
def _check_cron_file_permissions(path: Path) -> tuple[bool, str]:
"""
/etc/cron.d files must not be writable by group/other.
Owner should be root.
Mode can be 0600 or 0644 (both ok as long as not group/other-writable).
"""
if not path.exists():
return True, f"ok: cron file not present ({path}); schedule may not be configured yet"
try:
st = path.stat()
except OSError as e:
return False, f"cannot stat cron file {path}: {e}"
if not path.is_file():
return False, f"cron file is not a regular file: {path}"
problems: list[str] = []
# root owner
if st.st_uid != 0:
problems.append("owner is not root")
# must not be group/other writable
if (st.st_mode & 0o022) != 0:
problems.append("writable by group/other")
if problems:
mode_octal = oct(st.st_mode & 0o777)
return False, f"unsafe cron file permissions/ownership for {path} (mode={mode_octal}): {', '.join(problems)}"
mode_octal = oct(st.st_mode & 0o777)
return True, f"ok: cron file permissions/ownership OK ({path}, mode={mode_octal})"
def _check_pobsync_executable(prefix: Path) -> tuple[bool, str]:
exe = prefix / "bin" / "pobsync"
if not exe.exists():
return False, f"missing executable: {exe}"
if not os.access(str(exe), os.X_OK):
return False, f"not executable: {exe}"
return True, f"ok: executable {exe}"
def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: bool) -> dict[str, Any]:
# Phase 1 doctor does not perform network checks yet (connect/rsync_dry_run acknowledged).
paths = PobsyncPaths(home=prefix)
@@ -63,6 +137,7 @@ def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: boo
b1, m1 = _check_binary("rsync")
results.append({"check": "binary", "name": "rsync", "ok": b1, "message": m1})
ok = ok and b1
b2, m2 = _check_binary("ssh")
results.append({"check": "binary", "name": "ssh", "ok": b2, "message": m2})
ok = ok and b2
@@ -81,6 +156,36 @@ def run_doctor(prefix: Path, host: str | None, connect: bool, rsync_dry_run: boo
else:
results.append({"check": "backup_root", "ok": False, "error": "global config not loaded"})
# ---- Scheduling checks (Step 1) ----
c_ok, c_msg = _check_cron_service()
results.append({"check": "schedule_cron_service", "ok": c_ok, "message": c_msg})
ok = ok and c_ok
f_ok, f_msg = _check_cron_file_permissions(CRON_FILE_DEFAULT)
results.append({"check": "schedule_cron_file", "path": str(CRON_FILE_DEFAULT), "ok": f_ok, "message": f_msg})
ok = ok and f_ok
# We treat missing log dir as a warning rather than hard-fail in phase 1:
# cron redirection may fail, but backups can still run.
if LOG_DIR_DEFAULT.exists():
l_ok, l_msg = _check_writable_dir(LOG_DIR_DEFAULT)
results.append({"check": "schedule_log_dir", "path": str(LOG_DIR_DEFAULT), "ok": l_ok, "message": l_msg})
ok = ok and l_ok
else:
results.append(
{
"check": "schedule_log_dir",
"path": str(LOG_DIR_DEFAULT),
"ok": True,
"message": f"ok: log dir does not exist ({LOG_DIR_DEFAULT}); cron redirection may fail (backlog: create in install)",
}
)
e_ok, e_msg = _check_pobsync_executable(prefix)
results.append({"check": "schedule_pobsync_executable", "path": str(prefix / "bin" / "pobsync"), "ok": e_ok, "message": e_msg})
ok = ok and e_ok
# host checks
if host is not None:
host_path = paths.hosts_dir / f"{host}.yaml"

View File

@@ -0,0 +1,228 @@
from __future__ import annotations
import os
import shutil
import stat
import subprocess
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
CRON_FILE_DEFAULT = "/etc/cron.d/pobsync"
LOG_DIR_DEFAULT = "/var/log/pobsync"
@dataclass(frozen=True)
class DoctorCheck:
name: str
ok: bool
severity: str # "error" | "warning" | "info"
message: str
details: Optional[Dict[str, Any]] = None
def _run(cmd: List[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _check_cron_service() -> DoctorCheck:
systemctl = shutil.which("systemctl")
if not systemctl:
return DoctorCheck(
name="schedule.cron_service",
ok=True,
severity="warning",
message="systemctl not found; cannot verify cron service status",
details={"hint": "If cron isn't running, schedules won't execute."},
)
# Try both common service names
for svc in ("cron", "crond"):
cp = _run([systemctl, "is-active", svc])
if cp.returncode == 0 and cp.stdout.strip() == "active":
return DoctorCheck(
name="schedule.cron_service",
ok=True,
severity="info",
message=f"cron service is active ({svc})",
)
# Not active / unknown
return DoctorCheck(
name="schedule.cron_service",
ok=False,
severity="error",
message="cron service is not active (tried: cron, crond)",
details={"hint": "Enable/start cron (systemctl enable --now cron) or the equivalent on your distro."},
)
def _check_cron_file_permissions(cron_file: str) -> DoctorCheck:
try:
st = os.stat(cron_file)
except FileNotFoundError:
return DoctorCheck(
name="schedule.cron_file",
ok=True,
severity="warning",
message=f"cron file not found: {cron_file}",
details={"hint": "Create one via: pobsync schedule create <host> ..."},
)
except OSError as e:
return DoctorCheck(
name="schedule.cron_file",
ok=False,
severity="error",
message=f"cannot stat cron file: {cron_file}",
details={"error": str(e)},
)
if not stat.S_ISREG(st.st_mode):
return DoctorCheck(
name="schedule.cron_file",
ok=False,
severity="error",
message=f"cron file is not a regular file: {cron_file}",
)
problems: List[str] = []
if st.st_uid != 0:
problems.append("owner is not root")
# For /etc/cron.d, file must NOT be group/other writable.
# (Mode may be 600 or 644; both are fine as long as not writable by group/other.)
if (st.st_mode & 0o022) != 0:
problems.append("cron file is writable by group/other (must not be)")
mode_octal = oct(st.st_mode & 0o777)
if problems:
return DoctorCheck(
name="schedule.cron_file",
ok=False,
severity="error",
message=f"cron file permissions/ownership look unsafe: {cron_file}",
details={"mode": mode_octal, "uid": st.st_uid, "problems": problems},
)
return DoctorCheck(
name="schedule.cron_file",
ok=True,
severity="info",
message=f"cron file permissions/ownership OK: {cron_file}",
details={"mode": mode_octal},
)
def _check_log_dir(log_dir: str) -> DoctorCheck:
if not os.path.exists(log_dir):
return DoctorCheck(
name="schedule.log_dir",
ok=True,
severity="warning",
message=f"log directory does not exist: {log_dir}",
details={"hint": "Not fatal, but cron output redirection may fail. Backlog item: create in install."},
)
if not os.path.isdir(log_dir):
return DoctorCheck(
name="schedule.log_dir",
ok=False,
severity="error",
message=f"log path exists but is not a directory: {log_dir}",
)
if not os.access(log_dir, os.W_OK):
return DoctorCheck(
name="schedule.log_dir",
ok=False,
severity="error",
message=f"log directory is not writable: {log_dir}",
)
return DoctorCheck(
name="schedule.log_dir",
ok=True,
severity="info",
message=f"log directory OK: {log_dir}",
)
def _check_pobsync_executable(prefix: str) -> DoctorCheck:
exe = os.path.join(prefix, "bin", "pobsync")
if not os.path.exists(exe):
return DoctorCheck(
name="schedule.pobsync_executable",
ok=False,
severity="error",
message=f"pobsync executable not found at {exe}",
details={"hint": "Your cron entry likely points here; verify /opt/pobsync installation."},
)
if not os.access(exe, os.X_OK):
return DoctorCheck(
name="schedule.pobsync_executable",
ok=False,
severity="error",
message=f"pobsync exists but is not executable: {exe}",
)
return DoctorCheck(
name="schedule.pobsync_executable",
ok=True,
severity="info",
message=f"pobsync executable OK: {exe}",
)
def scheduling_checks(prefix: str, cron_file: str = CRON_FILE_DEFAULT) -> List[DoctorCheck]:
return [
_check_cron_service(),
_check_cron_file_permissions(cron_file),
_check_log_dir(LOG_DIR_DEFAULT),
_check_pobsync_executable(prefix),
]
def extend_doctor_result(result: Dict[str, Any], *, prefix: str, cron_file: str = CRON_FILE_DEFAULT) -> Dict[str, Any]:
"""
Add scheduling-related checks into an existing doctor result dict.
This is designed to be additive and low-risk:
- If result has a "checks" list, we append items.
- If result has "ok", we AND it with any error-level failures.
"""
checks = scheduling_checks(prefix=prefix, cron_file=cron_file)
# Normalize result structure
existing = result.get("checks")
if not isinstance(existing, list):
existing = []
result["checks"] = existing
for c in checks:
existing.append(
{
"name": c.name,
"ok": c.ok,
"severity": c.severity,
"message": c.message,
"details": c.details or {},
}
)
# Update overall ok: errors make it false; warnings do not.
overall_ok = bool(result.get("ok", True))
for c in checks:
if c.severity == "error" and not c.ok:
overall_ok = False
result["ok"] = overall_ok
return result