Files
pobsync/src/pobsync_backend/backup_runner.py

355 lines
12 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import socket
from datetime import timedelta, timezone as datetime_timezone
from pathlib import Path
from django.db import transaction
from django.utils import timezone
from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled
from pobsync_backend.config_source import DjangoConfigSource
from pobsync_backend.models import BackupRun, HostConfig
from pobsync_backend.retention import run_sql_retention_apply
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
def queue_backup_run(
*,
host: HostConfig,
run_type: str = BackupRun.RunType.MANUAL,
dry_run: bool = False,
verbose_output: bool = False,
prune: bool = False,
prune_max_delete: int = 10,
prune_protect_bases: bool = False,
) -> BackupRun:
return BackupRun.objects.create(
host=host,
run_type=run_type,
status=BackupRun.Status.QUEUED,
result={
"requested": {
"dry_run": bool(dry_run),
"verbose_output": bool(dry_run or verbose_output),
"prune": bool(prune),
"prune_max_delete": int(prune_max_delete),
"prune_protect_bases": bool(prune_protect_bases),
}
},
)
def execute_backup_run(
*,
run: BackupRun,
prefix: Path,
dry_run: bool = False,
verbose_output: bool = False,
prune: bool = False,
prune_max_delete: int = 10,
prune_protect_bases: bool = False,
) -> BackupRun:
run.status = BackupRun.Status.RUNNING
run.started_at = run.started_at or timezone.now()
run.result = _running_result(run=run, dry_run=bool(dry_run))
run.save(update_fields=["status", "started_at", "result"])
try:
result = run_scheduled(
prefix=prefix,
host=run.host.host,
dry_run=bool(dry_run),
prune=False,
config_source=DjangoConfigSource(),
run_id=run.id,
cancel_check=lambda: _run_cancel_requested(run.id),
verbose_output=bool(dry_run or verbose_output),
)
except Exception as exc:
run.refresh_from_db()
run.status = BackupRun.Status.CANCELLED if run.status == BackupRun.Status.CANCELLED else BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.result = {
**(run.result if isinstance(run.result, dict) else {}),
"ok": False,
"error": str(exc),
"type": type(exc).__name__,
}
run.save(update_fields=["status", "ended_at", "result"])
raise
run.refresh_from_db()
if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED:
run.status = BackupRun.Status.CANCELLED
else:
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.snapshot_path = str(result.get("snapshot") or "")
run.base_path = str(result.get("base") or "")
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
run.rsync_exit_code = rsync.get("exit_code")
run.result = result
snapshot_record = None
if run.snapshot_path:
snapshot_path = Path(run.snapshot_path)
try:
kind = infer_snapshot_kind(snapshot_path)
snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path)
except ValueError:
snapshot_record = None
if result.get("ok") and not result.get("dry_run") and prune:
try:
result["prune"] = run_sql_retention_apply(
prefix=prefix,
host=run.host.host,
kind="scheduled",
protect_bases=bool(prune_protect_bases),
yes=True,
max_delete=int(prune_max_delete),
action=run.run_type,
acquire_lock=False,
)
except Exception as exc:
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.status = BackupRun.Status.WARNING
run.result = result
run.snapshot = snapshot_record
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
run.snapshot = snapshot_record
run.result = result
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
return run
def claim_next_queued_run() -> BackupRun | None:
with transaction.atomic():
run = (
BackupRun.objects.select_related("host")
.filter(status=BackupRun.Status.QUEUED, host__enabled=True)
.order_by("created_at", "id")
.first()
)
if run is None:
return None
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now()
run.save(update_fields=["status", "started_at"])
return run
def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int:
reconciled = 0
for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"):
if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds):
reconciled += 1
return reconciled
def requested_options(run: BackupRun) -> dict[str, object]:
requested = run.result.get("requested") if isinstance(run.result, dict) else None
if not isinstance(requested, dict):
return {}
return requested
def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
result = dict(run.result) if isinstance(run.result, dict) else {}
execution = {
**_worker_execution_details(),
"started_at": (run.started_at or timezone.now()).isoformat(),
"heartbeat_at": timezone.now().isoformat(),
}
if dry_run:
execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id))
result["execution"] = execution
return result
def _run_cancel_requested(run_id: int) -> bool:
try:
run = BackupRun.objects.only("id", "status", "result").get(id=run_id)
except BackupRun.DoesNotExist:
return True
if run.status == BackupRun.Status.CANCELLED:
return True
if run.status == BackupRun.Status.RUNNING:
_refresh_run_heartbeat(run)
return False
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool:
result = run.result if isinstance(run.result, dict) else {}
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds)
if not requested.get("dry_run"):
if stale_worker:
result.update(
{
"ok": False,
"host": run.host.host,
"failure": {
"category": "worker",
"message": "The worker heartbeat stopped before the run finished.",
"hint": "Check pobsync-worker.service logs before retrying the backup.",
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.result = result
run.save(update_fields=["status", "ended_at", "result"])
return True
return False
log_path = _execution_log_path(result)
log_tail = _read_log_tail(log_path) if log_path is not None else []
terminal_log = _terminal_rsync_log(log_tail)
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
if not terminal_log and not timed_out and not stale_worker:
return False
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255)
failure = classify_rsync_failure(exit_code, log_tail)
if stale_worker and not terminal_log:
failure = {
"category": "worker",
"message": "The worker heartbeat stopped before the dry-run finished.",
"hint": "Check pobsync-worker.service logs before retrying the dry-run.",
}
result.update(
{
"ok": False,
"dry_run": True,
"host": run.host.host,
"base": result.get("base"),
"log": str(log_path) if log_path else "",
"failure": failure,
"rsync": {
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
"exit_code": exit_code,
"log_tail": log_tail,
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.rsync_exit_code = exit_code
run.result = result
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
return True
def _worker_execution_details() -> dict[str, object]:
return {
"worker_pid": os.getpid(),
"worker_host": socket.gethostname(),
"claimed_at": timezone.now().isoformat(),
}
def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None:
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds):
return
result["execution"] = {
**execution,
"worker_pid": os.getpid(),
"worker_host": socket.gethostname(),
"heartbeat_at": timezone.now().isoformat(),
}
run.result = result
run.save(update_fields=["result"])
def _execution_log_path(result: dict[str, object]) -> Path | None:
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
log = execution.get("log") or result.get("log")
if not isinstance(log, str) or not log:
return None
return Path(log)
def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
if log_path is None:
return []
try:
return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:]
except OSError:
return []
def _terminal_rsync_log(log_tail: list[str]) -> bool:
return any(line.startswith("rsync error:") for line in log_tail)
def _exit_code_from_log(log_tail: list[str]) -> int | None:
for line in reversed(log_tail):
if "code 255" in line:
return 255
if "code 124" in line:
return 124
if "code 12" in line:
return 12
return None
def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool:
if run.started_at is None:
return False
result = run.result if isinstance(run.result, dict) else {}
timeout_seconds = result.get("timeout_seconds")
if not isinstance(timeout_seconds, int) or timeout_seconds <= 0:
timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS
return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds)
def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool:
if stale_worker_seconds <= 0:
return False
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
if heartbeat_at is None:
heartbeat_at = run.started_at
if heartbeat_at is None:
return False
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
def _parse_iso_datetime(value: object):
if not isinstance(value, str) or not value:
return None
try:
parsed = timezone.datetime.fromisoformat(value)
except ValueError:
return None
if timezone.is_naive(parsed):
return timezone.make_aware(parsed, timezone=datetime_timezone.utc)
return parsed