Record snapshot purge history whenever retention or incomplete cleanup removes snapshot directories and SQL records. Store the purge reason, original kind, path, action source, and triggering operator so manual, scheduled, CLI, and incomplete cleanup actions remain auditable after the original snapshot record is deleted. Add a staff-only Purged Snapshots page with host/action filters and register the audit model in Django admin. Refs #16 Refs #8
355 lines
12 KiB
Python
355 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import socket
|
|
from datetime import timedelta, timezone as datetime_timezone
|
|
from pathlib import Path
|
|
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled
|
|
from pobsync_backend.config_source import DjangoConfigSource
|
|
from pobsync_backend.models import BackupRun, HostConfig
|
|
from pobsync_backend.retention import run_sql_retention_apply
|
|
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
|
|
|
|
|
|
def queue_backup_run(
|
|
*,
|
|
host: HostConfig,
|
|
run_type: str = BackupRun.RunType.MANUAL,
|
|
dry_run: bool = False,
|
|
verbose_output: bool = False,
|
|
prune: bool = False,
|
|
prune_max_delete: int = 10,
|
|
prune_protect_bases: bool = False,
|
|
) -> BackupRun:
|
|
return BackupRun.objects.create(
|
|
host=host,
|
|
run_type=run_type,
|
|
status=BackupRun.Status.QUEUED,
|
|
result={
|
|
"requested": {
|
|
"dry_run": bool(dry_run),
|
|
"verbose_output": bool(dry_run or verbose_output),
|
|
"prune": bool(prune),
|
|
"prune_max_delete": int(prune_max_delete),
|
|
"prune_protect_bases": bool(prune_protect_bases),
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
def execute_backup_run(
|
|
*,
|
|
run: BackupRun,
|
|
prefix: Path,
|
|
dry_run: bool = False,
|
|
verbose_output: bool = False,
|
|
prune: bool = False,
|
|
prune_max_delete: int = 10,
|
|
prune_protect_bases: bool = False,
|
|
) -> BackupRun:
|
|
run.status = BackupRun.Status.RUNNING
|
|
run.started_at = run.started_at or timezone.now()
|
|
run.result = _running_result(run=run, dry_run=bool(dry_run))
|
|
run.save(update_fields=["status", "started_at", "result"])
|
|
|
|
try:
|
|
result = run_scheduled(
|
|
prefix=prefix,
|
|
host=run.host.host,
|
|
dry_run=bool(dry_run),
|
|
prune=False,
|
|
config_source=DjangoConfigSource(),
|
|
run_id=run.id,
|
|
cancel_check=lambda: _run_cancel_requested(run.id),
|
|
verbose_output=bool(dry_run or verbose_output),
|
|
)
|
|
except Exception as exc:
|
|
run.refresh_from_db()
|
|
run.status = BackupRun.Status.CANCELLED if run.status == BackupRun.Status.CANCELLED else BackupRun.Status.FAILED
|
|
run.ended_at = timezone.now()
|
|
run.result = {
|
|
**(run.result if isinstance(run.result, dict) else {}),
|
|
"ok": False,
|
|
"error": str(exc),
|
|
"type": type(exc).__name__,
|
|
}
|
|
run.save(update_fields=["status", "ended_at", "result"])
|
|
raise
|
|
|
|
run.refresh_from_db()
|
|
if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED:
|
|
run.status = BackupRun.Status.CANCELLED
|
|
else:
|
|
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
|
|
run.ended_at = timezone.now()
|
|
run.snapshot_path = str(result.get("snapshot") or "")
|
|
run.base_path = str(result.get("base") or "")
|
|
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
|
|
run.rsync_exit_code = rsync.get("exit_code")
|
|
run.result = result
|
|
snapshot_record = None
|
|
if run.snapshot_path:
|
|
snapshot_path = Path(run.snapshot_path)
|
|
try:
|
|
kind = infer_snapshot_kind(snapshot_path)
|
|
snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path)
|
|
except ValueError:
|
|
snapshot_record = None
|
|
|
|
if result.get("ok") and not result.get("dry_run") and prune:
|
|
try:
|
|
result["prune"] = run_sql_retention_apply(
|
|
prefix=prefix,
|
|
host=run.host.host,
|
|
kind="scheduled",
|
|
protect_bases=bool(prune_protect_bases),
|
|
yes=True,
|
|
max_delete=int(prune_max_delete),
|
|
action=run.run_type,
|
|
acquire_lock=False,
|
|
)
|
|
except Exception as exc:
|
|
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
|
|
run.status = BackupRun.Status.WARNING
|
|
run.result = result
|
|
run.snapshot = snapshot_record
|
|
run.save(
|
|
update_fields=[
|
|
"status",
|
|
"ended_at",
|
|
"snapshot_path",
|
|
"snapshot",
|
|
"base_path",
|
|
"rsync_exit_code",
|
|
"result",
|
|
],
|
|
)
|
|
|
|
run.snapshot = snapshot_record
|
|
run.result = result
|
|
run.save(
|
|
update_fields=[
|
|
"status",
|
|
"ended_at",
|
|
"snapshot_path",
|
|
"snapshot",
|
|
"base_path",
|
|
"rsync_exit_code",
|
|
"result",
|
|
],
|
|
)
|
|
return run
|
|
|
|
|
|
def claim_next_queued_run() -> BackupRun | None:
|
|
with transaction.atomic():
|
|
run = (
|
|
BackupRun.objects.select_related("host")
|
|
.filter(status=BackupRun.Status.QUEUED, host__enabled=True)
|
|
.order_by("created_at", "id")
|
|
.first()
|
|
)
|
|
if run is None:
|
|
return None
|
|
run.status = BackupRun.Status.RUNNING
|
|
run.started_at = timezone.now()
|
|
run.save(update_fields=["status", "started_at"])
|
|
return run
|
|
|
|
|
|
def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int:
|
|
reconciled = 0
|
|
for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"):
|
|
if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds):
|
|
reconciled += 1
|
|
return reconciled
|
|
|
|
|
|
def requested_options(run: BackupRun) -> dict[str, object]:
|
|
requested = run.result.get("requested") if isinstance(run.result, dict) else None
|
|
if not isinstance(requested, dict):
|
|
return {}
|
|
return requested
|
|
|
|
|
|
def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
|
|
result = dict(run.result) if isinstance(run.result, dict) else {}
|
|
execution = {
|
|
**_worker_execution_details(),
|
|
"started_at": (run.started_at or timezone.now()).isoformat(),
|
|
"heartbeat_at": timezone.now().isoformat(),
|
|
}
|
|
if dry_run:
|
|
execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id))
|
|
result["execution"] = execution
|
|
return result
|
|
|
|
|
|
def _run_cancel_requested(run_id: int) -> bool:
|
|
try:
|
|
run = BackupRun.objects.only("id", "status", "result").get(id=run_id)
|
|
except BackupRun.DoesNotExist:
|
|
return True
|
|
if run.status == BackupRun.Status.CANCELLED:
|
|
return True
|
|
if run.status == BackupRun.Status.RUNNING:
|
|
_refresh_run_heartbeat(run)
|
|
return False
|
|
|
|
|
|
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool:
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
|
|
stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds)
|
|
if not requested.get("dry_run"):
|
|
if stale_worker:
|
|
result.update(
|
|
{
|
|
"ok": False,
|
|
"host": run.host.host,
|
|
"failure": {
|
|
"category": "worker",
|
|
"message": "The worker heartbeat stopped before the run finished.",
|
|
"hint": "Check pobsync-worker.service logs before retrying the backup.",
|
|
},
|
|
}
|
|
)
|
|
run.status = BackupRun.Status.FAILED
|
|
run.ended_at = timezone.now()
|
|
run.result = result
|
|
run.save(update_fields=["status", "ended_at", "result"])
|
|
return True
|
|
return False
|
|
|
|
log_path = _execution_log_path(result)
|
|
log_tail = _read_log_tail(log_path) if log_path is not None else []
|
|
terminal_log = _terminal_rsync_log(log_tail)
|
|
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
|
|
if not terminal_log and not timed_out and not stale_worker:
|
|
return False
|
|
|
|
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255)
|
|
failure = classify_rsync_failure(exit_code, log_tail)
|
|
if stale_worker and not terminal_log:
|
|
failure = {
|
|
"category": "worker",
|
|
"message": "The worker heartbeat stopped before the dry-run finished.",
|
|
"hint": "Check pobsync-worker.service logs before retrying the dry-run.",
|
|
}
|
|
result.update(
|
|
{
|
|
"ok": False,
|
|
"dry_run": True,
|
|
"host": run.host.host,
|
|
"base": result.get("base"),
|
|
"log": str(log_path) if log_path else "",
|
|
"failure": failure,
|
|
"rsync": {
|
|
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
|
|
"exit_code": exit_code,
|
|
"log_tail": log_tail,
|
|
},
|
|
}
|
|
)
|
|
run.status = BackupRun.Status.FAILED
|
|
run.ended_at = timezone.now()
|
|
run.rsync_exit_code = exit_code
|
|
run.result = result
|
|
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
|
|
return True
|
|
|
|
|
|
def _worker_execution_details() -> dict[str, object]:
|
|
return {
|
|
"worker_pid": os.getpid(),
|
|
"worker_host": socket.gethostname(),
|
|
"claimed_at": timezone.now().isoformat(),
|
|
}
|
|
|
|
|
|
def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None:
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
|
|
if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds):
|
|
return
|
|
result["execution"] = {
|
|
**execution,
|
|
"worker_pid": os.getpid(),
|
|
"worker_host": socket.gethostname(),
|
|
"heartbeat_at": timezone.now().isoformat(),
|
|
}
|
|
run.result = result
|
|
run.save(update_fields=["result"])
|
|
|
|
|
|
def _execution_log_path(result: dict[str, object]) -> Path | None:
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
log = execution.get("log") or result.get("log")
|
|
if not isinstance(log, str) or not log:
|
|
return None
|
|
return Path(log)
|
|
|
|
|
|
def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
|
|
if log_path is None:
|
|
return []
|
|
try:
|
|
return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:]
|
|
except OSError:
|
|
return []
|
|
|
|
|
|
def _terminal_rsync_log(log_tail: list[str]) -> bool:
|
|
return any(line.startswith("rsync error:") for line in log_tail)
|
|
|
|
|
|
def _exit_code_from_log(log_tail: list[str]) -> int | None:
|
|
for line in reversed(log_tail):
|
|
if "code 255" in line:
|
|
return 255
|
|
if "code 124" in line:
|
|
return 124
|
|
if "code 12" in line:
|
|
return 12
|
|
return None
|
|
|
|
|
|
def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool:
|
|
if run.started_at is None:
|
|
return False
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
timeout_seconds = result.get("timeout_seconds")
|
|
if not isinstance(timeout_seconds, int) or timeout_seconds <= 0:
|
|
timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS
|
|
return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds)
|
|
|
|
|
|
def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool:
|
|
if stale_worker_seconds <= 0:
|
|
return False
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
|
|
if heartbeat_at is None:
|
|
heartbeat_at = run.started_at
|
|
if heartbeat_at is None:
|
|
return False
|
|
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
|
|
|
|
|
|
def _parse_iso_datetime(value: object):
|
|
if not isinstance(value, str) or not value:
|
|
return None
|
|
try:
|
|
parsed = timezone.datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
if timezone.is_naive(parsed):
|
|
return timezone.make_aware(parsed, timezone=datetime_timezone.utc)
|
|
return parsed
|