Files
pobsync/src/pobsync_backend/views.py
Peter van Arkel 728e5c740a (feature) Add optional verbose rsync output for manual backups
Expose a verbose rsync output option in the Django manual backup form and
store the selected value with the queued run request.

Propagate the option through the worker, direct management command, and
rsync command builder so real backups can emit itemized changes, file-list
progress, and stats when requested. Dry-runs continue to use verbose output
by default and report that consistently in requested options.

Cover the queue, worker, view, and rsync command behavior with focused
tests.
2026-05-19 22:13:33 +02:00

650 lines
24 KiB
Python

from __future__ import annotations
import json
import shutil
import subprocess
from pathlib import Path
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
from django.conf import settings
from django.db.models import Count
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from django.views.decorators.http import require_POST
from pobsync.errors import PobsyncError
from pobsync.config.defaults import DEFAULT_EXCLUDES, DEFAULT_RSYNC_ARGS
from .backup_runner import queue_backup_run
from .config_checks import collect_effective_host_config_checks, collect_global_config_checks
from .forms import (
CreateHostConfigForm,
GlobalConfigForm,
HostConfigForm,
ManualBackupForm,
RetentionApplyForm,
SshCredentialGenerateForm,
ScheduleConfigForm,
SshCredentialForm,
)
from .host_ops import collect_host_checks, ensure_host_directories
from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential
from .retention import run_sql_retention_apply, run_sql_retention_plan
from .self_check import collect_self_checks, summarize_self_checks
from .snapshot_discovery import discover_snapshots, inspect_snapshot_discovery
from .ssh_keys import SshKeyError, delete_generated_key_files, generate_ssh_key, merge_known_hosts, scan_known_host
@staff_member_required
def dashboard(request):
hosts = list(
HostConfig.objects.annotate(snapshot_count=Count("snapshots", distinct=True), run_count=Count("runs", distinct=True))
.order_by("host")
)
for host_config in hosts:
host_config.latest_snapshot = (
host_config.snapshots.select_related("base")
.order_by("-started_at", "-discovered_at", "-id")
.first()
)
context = {
"hosts": hosts,
"global_config": GlobalConfig.objects.filter(name="default").first(),
"latest_runs": BackupRun.objects.select_related("host", "snapshot").order_by("-created_at")[:10],
"counts": {
"global_configs": GlobalConfig.objects.count(),
"hosts": HostConfig.objects.count(),
"enabled_hosts": HostConfig.objects.filter(enabled=True).count(),
"schedules": ScheduleConfig.objects.count(),
"enabled_schedules": ScheduleConfig.objects.filter(enabled=True).count(),
"snapshots": SnapshotRecord.objects.count(),
"runs": BackupRun.objects.count(),
"running_runs": BackupRun.objects.filter(status=BackupRun.Status.RUNNING).count(),
"failed_runs": BackupRun.objects.filter(status=BackupRun.Status.FAILED).count(),
},
}
return render(request, "pobsync_backend/dashboard.html", context)
@staff_member_required
def self_check(request):
checks = collect_self_checks()
return render(
request,
"pobsync_backend/self_check.html",
{
"checks": checks,
"summary": summarize_self_checks(checks),
},
)
@staff_member_required
def logs(request):
context = _log_context(request)
return render(request, "pobsync_backend/logs.html", context)
@staff_member_required
def ssh_credentials(request):
context = {
"credentials": SshCredential.objects.order_by("name"),
}
return render(request, "pobsync_backend/ssh_credentials.html", context)
@staff_member_required
def create_ssh_credential(request):
if request.method == "POST":
form = SshCredentialForm(request.POST, request.FILES)
if form.is_valid():
credential = form.save()
messages.success(request, f"SSH credential saved for {credential.name}.")
return redirect("ssh_credentials")
else:
form = SshCredentialForm()
return render(
request,
"pobsync_backend/ssh_credential_form.html",
{
"form": form,
"credential": None,
},
)
@staff_member_required
def generate_ssh_credential(request):
if request.method == "POST":
form = SshCredentialGenerateForm(request.POST)
if form.is_valid():
credential = SshCredential.objects.create(
name=form.cleaned_data["name"],
key_type=form.cleaned_data["key_type"],
known_hosts=form.cleaned_data["known_hosts"],
notes=form.cleaned_data["notes"],
)
try:
credential = generate_ssh_key(credential, key_type=form.cleaned_data["key_type"])
except SshKeyError as exc:
credential.delete()
form.add_error(None, str(exc))
else:
if form.cleaned_data["set_global_default"]:
global_config = GlobalConfig.objects.filter(name="default").first()
if global_config is not None:
global_config.default_ssh_credential = credential
global_config.save(update_fields=["default_ssh_credential", "updated_at"])
messages.success(request, f"SSH key generated for {credential.name}.")
return redirect("ssh_credentials")
else:
form = SshCredentialGenerateForm()
return render(
request,
"pobsync_backend/ssh_credential_generate.html",
{
"form": form,
},
)
@staff_member_required
def edit_ssh_credential(request, credential_id: int):
credential = get_object_or_404(SshCredential, id=credential_id)
if request.method == "POST":
form = SshCredentialForm(request.POST, request.FILES, instance=credential)
if form.is_valid():
saved_credential = form.save()
messages.success(request, f"SSH credential saved for {saved_credential.name}.")
return redirect("ssh_credentials")
else:
form = SshCredentialForm(instance=credential)
return render(
request,
"pobsync_backend/ssh_credential_form.html",
{
"form": form,
"credential": credential,
},
)
@staff_member_required
@require_POST
def delete_ssh_credential(request, credential_id: int):
credential = get_object_or_404(SshCredential, id=credential_id)
if credential.hosts.exists() or credential.global_configs.exists():
messages.error(request, f"SSH key {credential.name} is still in use and cannot be deleted.")
return redirect("edit_ssh_credential", credential_id=credential.id)
name = credential.name
try:
if credential.generated or credential.key_path:
delete_generated_key_files(credential)
except SshKeyError as exc:
messages.error(request, f"Could not delete SSH key files for {name}: {exc}")
return redirect("edit_ssh_credential", credential_id=credential.id)
credential.delete()
messages.success(request, f"SSH key deleted: {name}.")
return redirect("ssh_credentials")
@staff_member_required
def edit_global_config(request):
global_config = GlobalConfig.objects.filter(name="default").first()
if request.method == "POST":
form = GlobalConfigForm(request.POST, instance=global_config)
if form.is_valid():
saved_config = form.save()
messages.success(request, f"Global config saved for {saved_config.name}.")
return redirect("dashboard")
else:
form = GlobalConfigForm(instance=global_config) if global_config else GlobalConfigForm(initial=_default_global_initial())
config_checks = collect_global_config_checks(global_config) if global_config else []
return render(
request,
"pobsync_backend/global_form.html",
{
"global_config": global_config,
"form": form,
"backup_root": settings.POBSYNC_BACKUP_ROOT,
"config_checks": config_checks,
"config_check_summary": summarize_self_checks(config_checks),
},
)
@staff_member_required
def create_host_config(request):
if request.method == "POST":
form = CreateHostConfigForm(request.POST)
if form.is_valid():
host_config = form.save()
try:
host_root = ensure_host_directories(host_config)
except Exception as exc:
messages.warning(request, f"Host config created, but backup directories could not be prepared: {exc}")
else:
messages.success(request, f"Host config created for {host_config.host}; prepared {host_root}.")
return redirect("host_detail", host=host_config.host)
messages.success(request, f"Host config created for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
else:
form = CreateHostConfigForm(initial=_default_host_initial())
return render(
request,
"pobsync_backend/host_form.html",
{
"host": None,
"form": form,
},
)
@staff_member_required
def host_detail(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
queued_runs = host_config.runs.filter(status=BackupRun.Status.QUEUED)
running_runs = host_config.runs.filter(status=BackupRun.Status.RUNNING)
active_run = host_config.runs.filter(
status__in=[BackupRun.Status.QUEUED, BackupRun.Status.RUNNING]
).order_by("created_at", "id").first()
has_global_config = GlobalConfig.objects.filter(name="default").exists()
host_checks = collect_host_checks(host_config)
context = {
"host": host_config,
"schedule": _schedule_for_host(host_config),
"discovery": inspect_snapshot_discovery(host=host_config),
"host_checks": host_checks,
"host_check_summary": summarize_self_checks(host_checks),
"manual_backup_form": ManualBackupForm(initial=_default_manual_backup_initial(host_config)),
"can_queue_backup": host_config.enabled and has_global_config,
"has_global_config": has_global_config,
"active_run": active_run,
"latest_runs": host_config.runs.select_related("snapshot").order_by("-created_at")[:10],
"snapshots": host_config.snapshots.select_related("base").order_by("-started_at", "dirname")[:20],
"counts": {
"snapshots": host_config.snapshots.count(),
"runs": host_config.runs.count(),
"queued_runs": queued_runs.count(),
"running_runs": running_runs.count(),
"failed_runs": host_config.runs.filter(status=BackupRun.Status.FAILED).count(),
"incomplete_snapshots": host_config.snapshots.filter(kind=SnapshotRecord.Kind.INCOMPLETE).count(),
},
}
return render(request, "pobsync_backend/host_detail.html", context)
@staff_member_required
@require_POST
def prepare_host_directories(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
try:
host_root = ensure_host_directories(host_config)
except Exception as exc:
messages.error(request, f"Could not prepare backup directories for {host_config.host}: {exc}")
else:
messages.success(request, f"Prepared backup directories for {host_config.host}: {host_root}")
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def scan_host_known_key(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
global_config = GlobalConfig.objects.filter(name="default").first()
credential = host_config.ssh_credential or (global_config.default_ssh_credential if global_config else None)
if credential is None:
messages.error(request, f"No SSH credential is selected for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
port = host_config.ssh_port or (global_config.ssh_port if global_config else 22)
try:
scanned = scan_known_host(host_config.address, port=int(port or 22))
except SshKeyError as exc:
messages.error(request, f"Could not scan SSH host key for {host_config.host}: {exc}")
else:
credential.known_hosts = merge_known_hosts(credential.known_hosts, scanned)
credential.save(update_fields=["known_hosts", "updated_at"])
messages.success(request, f"Stored SSH host key for {host_config.host} on credential {credential.name}.")
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def queue_manual_backup(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
if not host_config.enabled:
messages.error(request, f"Cannot queue backup for disabled host {host_config.host}.")
return redirect("host_detail", host=host_config.host)
if not GlobalConfig.objects.filter(name="default").exists():
messages.error(request, "Create the default global config before queueing backups.")
return redirect("host_detail", host=host_config.host)
form = ManualBackupForm(request.POST)
if not form.is_valid():
messages.error(request, "Manual backup options are invalid.")
return redirect("host_detail", host=host_config.host)
run = queue_backup_run(
host=host_config,
dry_run=form.cleaned_data["dry_run"],
verbose_output=form.cleaned_data["verbose_output"],
prune=form.cleaned_data["prune"],
prune_max_delete=form.cleaned_data["prune_max_delete"],
prune_protect_bases=form.cleaned_data["prune_protect_bases"],
)
messages.success(request, f"Queued manual backup run {run.id} for {host_config.host}.")
return redirect("run_detail", run_id=run.id)
@staff_member_required
def run_detail(request, run_id: int):
run = get_object_or_404(BackupRun.objects.select_related("host", "snapshot"), id=run_id)
context = {
"run": run,
"can_cancel": run.status in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING},
"requested": run.result.get("requested") if isinstance(run.result, dict) else {},
"result_json": _pretty_json(run.result),
}
return render(request, "pobsync_backend/run_detail.html", context)
@staff_member_required
@require_POST
def cancel_run(request, run_id: int):
run = get_object_or_404(BackupRun.objects.select_related("host"), id=run_id)
if run.status not in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}:
messages.warning(request, f"Run {run.id} is already {run.status}.")
return redirect("run_detail", run_id=run.id)
result = dict(run.result) if isinstance(run.result, dict) else {}
result["cancellation"] = {
"requested_at": timezone.now().isoformat(),
"previous_status": run.status,
}
update_fields = ["status", "result"]
run.status = BackupRun.Status.CANCELLED
run.result = result
if result["cancellation"]["previous_status"] == BackupRun.Status.QUEUED:
run.ended_at = timezone.now()
update_fields.append("ended_at")
run.save(update_fields=update_fields)
messages.success(request, f"Cancellation requested for run {run.id}.")
return redirect("run_detail", run_id=run.id)
@staff_member_required
def snapshot_detail(request, snapshot_id: int):
snapshot = get_object_or_404(
SnapshotRecord.objects.select_related("host", "base").prefetch_related("derived_snapshots", "backup_runs"),
id=snapshot_id,
)
context = {
"snapshot": snapshot,
"metadata_json": _pretty_json(snapshot.metadata),
"backup_runs": snapshot.backup_runs.select_related("host").order_by("-created_at"),
"derived_snapshots": snapshot.derived_snapshots.select_related("host").order_by("-started_at", "dirname"),
}
return render(request, "pobsync_backend/snapshot_detail.html", context)
@staff_member_required
@require_POST
def discover_host_snapshots(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
try:
result = discover_snapshots(host=host_config)
except Exception as exc:
messages.error(request, f"Snapshot discovery failed for {host_config.host}: {exc}")
else:
summary = (
f"Snapshot discovery scanned {result['scanned']} items for {host_config.host}: "
f"{result['created']} created, {result['updated']} updated."
)
if result["scanned"]:
messages.success(request, summary)
else:
discovery = inspect_snapshot_discovery(host=host_config)
messages.warning(request, f"{summary} {discovery['message']}")
return redirect("host_detail", host=host_config.host)
@staff_member_required
def host_retention_plan(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
kind = request.GET.get("kind", "scheduled")
if kind not in {"scheduled", "manual", "all"}:
messages.error(request, "Retention kind must be scheduled, manual, or all.")
return redirect("host_detail", host=host_config.host)
protect_bases = request.GET.get("protect_bases") in {"1", "true", "on", "yes"}
try:
plan = run_sql_retention_plan(host=host_config.host, kind=kind, protect_bases=protect_bases)
except PobsyncError as exc:
messages.error(request, str(exc))
return redirect("host_detail", host=host_config.host)
context = {
"host": host_config,
"kind": kind,
"protect_bases": protect_bases,
"plan": plan,
"apply_form": RetentionApplyForm(
host_name=host_config.host,
initial={
"kind": kind,
"protect_bases": protect_bases,
"max_delete": len(plan["delete"]),
},
),
}
return render(request, "pobsync_backend/retention_plan.html", context)
@staff_member_required
@require_POST
def apply_host_retention(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
form = RetentionApplyForm(request.POST, host_name=host_config.host)
if not form.is_valid():
messages.error(request, "Retention apply confirmation is invalid.")
return redirect("host_retention_plan", host=host_config.host)
kind = form.cleaned_data["kind"]
protect_bases = bool(form.cleaned_data["protect_bases"])
try:
result = run_sql_retention_apply(
prefix=Path(settings.POBSYNC_HOME),
host=host_config.host,
kind=kind,
protect_bases=protect_bases,
yes=True,
max_delete=form.cleaned_data["max_delete"],
)
except PobsyncError as exc:
messages.error(request, str(exc))
else:
messages.success(request, f"Retention deleted {len(result['deleted'])} snapshot(s) for {host_config.host}.")
target = redirect("host_retention_plan", host=host_config.host)
query = f"kind={kind}"
if protect_bases:
query += "&protect_bases=1"
target["Location"] = f"{target['Location']}?{query}"
return target
@staff_member_required
def edit_host_config(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
global_config = GlobalConfig.objects.filter(name="default").first()
config_checks = collect_effective_host_config_checks(host_config, global_config) if global_config else []
if request.method == "POST":
form = HostConfigForm(request.POST, instance=host_config)
if form.is_valid():
form.save()
messages.success(request, f"Host config saved for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
else:
form = HostConfigForm(instance=host_config)
return render(
request,
"pobsync_backend/host_form.html",
{
"host": host_config,
"form": form,
"config_checks": config_checks,
"config_check_summary": summarize_self_checks(config_checks),
},
)
@staff_member_required
def edit_host_schedule(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
schedule = _schedule_for_host(host_config)
if request.method == "POST":
form = ScheduleConfigForm(request.POST, instance=schedule)
if form.is_valid():
saved_schedule = form.save(commit=False)
saved_schedule.host = host_config
saved_schedule.save()
messages.success(request, f"Schedule saved for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
else:
form = ScheduleConfigForm(instance=schedule, initial=_default_schedule_initial())
return render(
request,
"pobsync_backend/schedule_form.html",
{
"host": host_config,
"schedule": schedule,
"form": form,
},
)
def _schedule_for_host(host_config: HostConfig) -> ScheduleConfig | None:
try:
return host_config.schedule
except ScheduleConfig.DoesNotExist:
return None
def _default_schedule_initial() -> dict[str, object]:
return {
"cron_expr": "15 2 * * *",
"user": "root",
"enabled": True,
"prune_max_delete": 10,
}
def _default_global_initial() -> dict[str, object]:
return {
"name": "default",
"default_ssh_credential": SshCredential.objects.order_by("name").first(),
"ssh_user": "root",
"ssh_port": 22,
"rsync_binary": "rsync",
"rsync_args": DEFAULT_RSYNC_ARGS,
"default_source_root": "/",
"excludes_default": DEFAULT_EXCLUDES,
"retention_daily": 14,
"retention_weekly": 8,
"retention_monthly": 12,
"retention_yearly": 0,
}
def _default_host_initial() -> dict[str, object]:
global_config = GlobalConfig.objects.filter(name="default").first()
if global_config is not None:
return {
"enabled": True,
"ssh_credential": global_config.default_ssh_credential,
"ssh_user": global_config.ssh_user,
"ssh_port": global_config.ssh_port,
"source_root": global_config.default_source_root,
"retention_daily": global_config.retention_daily,
"retention_weekly": global_config.retention_weekly,
"retention_monthly": global_config.retention_monthly,
"retention_yearly": global_config.retention_yearly,
}
return {
"enabled": True,
"retention_daily": 14,
"retention_weekly": 8,
"retention_monthly": 12,
"retention_yearly": 0,
}
def _default_manual_backup_initial(host_config: HostConfig) -> dict[str, object]:
schedule = _schedule_for_host(host_config)
return {
"dry_run": True,
"prune": bool(schedule.prune) if schedule else False,
"prune_max_delete": schedule.prune_max_delete if schedule else 10,
"prune_protect_bases": bool(schedule.prune_protect_bases) if schedule else False,
}
def _pretty_json(value: object) -> str:
return json.dumps(value or {}, indent=2, sort_keys=True)
def _log_context(request) -> dict[str, object]:
units = ("pobsync-web.service", "pobsync-worker.service", "pobsync-scheduler.service")
priorities = {
"": "All",
"0..3": "Errors",
"4": "Warnings",
"5": "Notices",
"6": "Info",
"7": "Debug",
}
selected_unit = request.GET.get("unit", "")
priority = request.GET.get("priority", "0..4")
query = request.GET.get("q", "").strip()
lines = []
error = ""
if shutil.which("journalctl") is None:
error = "journalctl is not available in this runtime."
else:
command = ["journalctl", "--no-pager", "-n", "300", "-o", "short-iso"]
if selected_unit in units:
command.extend(["-u", selected_unit])
else:
for unit in units:
command.extend(["-u", unit])
if priority:
command.extend(["-p", priority])
result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10)
if result.returncode != 0:
error = result.stderr.strip() or "Could not read journal logs."
else:
lines = result.stdout.splitlines()
if query:
lowered_query = query.lower()
lines = [line for line in lines if lowered_query in line.lower()]
return {
"units": units,
"priorities": priorities,
"selected_unit": selected_unit,
"selected_priority": priority,
"query": query,
"lines": lines,
"error": error,
}