Files
pobsync/src/pobsync_backend/backup_runner.py
Peter van Arkel 728e5c740a (feature) Add optional verbose rsync output for manual backups
Expose a verbose rsync output option in the Django manual backup form and
store the selected value with the queued run request.

Propagate the option through the worker, direct management command, and
rsync command builder so real backups can emit itemized changes, file-list
progress, and stats when requested. Dry-runs continue to use verbose output
by default and report that consistently in requested options.

Cover the queue, worker, view, and rsync command behavior with focused
tests.
2026-05-19 22:13:33 +02:00

270 lines
9.1 KiB
Python

from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from django.db import transaction
from django.utils import timezone
from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled
from pobsync_backend.config_source import DjangoConfigSource
from pobsync_backend.models import BackupRun, HostConfig
from pobsync_backend.retention import run_sql_retention_apply
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
def queue_backup_run(
*,
host: HostConfig,
run_type: str = BackupRun.RunType.MANUAL,
dry_run: bool = False,
verbose_output: bool = False,
prune: bool = False,
prune_max_delete: int = 10,
prune_protect_bases: bool = False,
) -> BackupRun:
return BackupRun.objects.create(
host=host,
run_type=run_type,
status=BackupRun.Status.QUEUED,
result={
"requested": {
"dry_run": bool(dry_run),
"verbose_output": bool(dry_run or verbose_output),
"prune": bool(prune),
"prune_max_delete": int(prune_max_delete),
"prune_protect_bases": bool(prune_protect_bases),
}
},
)
def execute_backup_run(
*,
run: BackupRun,
prefix: Path,
dry_run: bool = False,
verbose_output: bool = False,
prune: bool = False,
prune_max_delete: int = 10,
prune_protect_bases: bool = False,
) -> BackupRun:
run.status = BackupRun.Status.RUNNING
run.started_at = run.started_at or timezone.now()
run.result = _running_result(run=run, dry_run=bool(dry_run))
run.save(update_fields=["status", "started_at", "result"])
try:
result = run_scheduled(
prefix=prefix,
host=run.host.host,
dry_run=bool(dry_run),
prune=False,
config_source=DjangoConfigSource(),
run_id=run.id,
cancel_check=lambda: _run_cancel_requested(run.id),
verbose_output=bool(dry_run or verbose_output),
)
except Exception as exc:
run.refresh_from_db()
run.status = BackupRun.Status.CANCELLED if run.status == BackupRun.Status.CANCELLED else BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.result = {
**(run.result if isinstance(run.result, dict) else {}),
"ok": False,
"error": str(exc),
"type": type(exc).__name__,
}
run.save(update_fields=["status", "ended_at", "result"])
raise
run.refresh_from_db()
if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED:
run.status = BackupRun.Status.CANCELLED
else:
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.snapshot_path = str(result.get("snapshot") or "")
run.base_path = str(result.get("base") or "")
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
run.rsync_exit_code = rsync.get("exit_code")
run.result = result
snapshot_record = None
if run.snapshot_path:
snapshot_path = Path(run.snapshot_path)
try:
kind = infer_snapshot_kind(snapshot_path)
snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path)
except ValueError:
snapshot_record = None
if result.get("ok") and not result.get("dry_run") and prune:
try:
result["prune"] = run_sql_retention_apply(
prefix=prefix,
host=run.host.host,
kind="scheduled",
protect_bases=bool(prune_protect_bases),
yes=True,
max_delete=int(prune_max_delete),
acquire_lock=False,
)
except Exception as exc:
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.status = BackupRun.Status.FAILED
run.result = result
run.snapshot = snapshot_record
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
raise
run.snapshot = snapshot_record
run.result = result
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
return run
def claim_next_queued_run() -> BackupRun | None:
with transaction.atomic():
run = (
BackupRun.objects.select_related("host")
.filter(status=BackupRun.Status.QUEUED, host__enabled=True)
.order_by("created_at", "id")
.first()
)
if run is None:
return None
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now()
run.save(update_fields=["status", "started_at"])
return run
def reconcile_running_runs(*, grace_seconds: int = 300) -> int:
reconciled = 0
for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"):
if _reconcile_running_run(run=run, grace_seconds=grace_seconds):
reconciled += 1
return reconciled
def requested_options(run: BackupRun) -> dict[str, object]:
requested = run.result.get("requested") if isinstance(run.result, dict) else None
if not isinstance(requested, dict):
return {}
return requested
def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
result = dict(run.result) if isinstance(run.result, dict) else {}
execution = {
"started_at": (run.started_at or timezone.now()).isoformat(),
}
if dry_run:
execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id))
result["execution"] = execution
return result
def _run_cancel_requested(run_id: int) -> bool:
return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists()
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool:
result = run.result if isinstance(run.result, dict) else {}
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
if not requested.get("dry_run"):
return False
log_path = _execution_log_path(result)
log_tail = _read_log_tail(log_path) if log_path is not None else []
terminal_log = _terminal_rsync_log(log_tail)
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
if not terminal_log and not timed_out:
return False
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out else 255)
failure = classify_rsync_failure(exit_code, log_tail)
result.update(
{
"ok": False,
"dry_run": True,
"host": run.host.host,
"base": result.get("base"),
"log": str(log_path) if log_path else "",
"failure": failure,
"rsync": {
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
"exit_code": exit_code,
"log_tail": log_tail,
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.rsync_exit_code = exit_code
run.result = result
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
return True
def _execution_log_path(result: dict[str, object]) -> Path | None:
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
log = execution.get("log") or result.get("log")
if not isinstance(log, str) or not log:
return None
return Path(log)
def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
if log_path is None:
return []
try:
return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:]
except OSError:
return []
def _terminal_rsync_log(log_tail: list[str]) -> bool:
return any(line.startswith("rsync error:") for line in log_tail)
def _exit_code_from_log(log_tail: list[str]) -> int | None:
for line in reversed(log_tail):
if "code 255" in line:
return 255
if "code 124" in line:
return 124
if "code 12" in line:
return 12
return None
def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool:
if run.started_at is None:
return False
result = run.result if isinstance(run.result, dict) else {}
timeout_seconds = result.get("timeout_seconds")
if not isinstance(timeout_seconds, int) or timeout_seconds <= 0:
timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS
return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds)