Files
pobsync/src/pobsync_backend/views.py
Peter van Arkel c2e5a534aa (release) Add review resolution for operational tasks
Add reviewed state for failed/warning runs and incomplete snapshot records,
then use it to clear dashboard and host “need review” tasks after an operator
has acknowledged them.

Expose Mark reviewed actions on run detail and host retention warnings, keep
reviewed records available for audit/debug, and exclude reviewed problem runs
from operational counts and latest issue summaries.

Refs #19
Refs #8
2026-05-21 03:34:41 +02:00

1158 lines
45 KiB
Python

from __future__ import annotations
import json
import shlex
import shutil
import subprocess
from pathlib import Path
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
from django.conf import settings
from django.http import FileResponse, Http404
from django.db.models import Count, Q
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from django.views.decorators.http import require_POST
from pobsync import __version__
from pobsync.errors import PobsyncError
from pobsync.config.defaults import DEFAULT_EXCLUDES, DEFAULT_RSYNC_ARGS
from .backup_runner import queue_backup_run
from .config_checks import collect_effective_host_config_checks, collect_global_config_checks
from .forms import (
CreateHostConfigForm,
GlobalConfigForm,
HostConfigForm,
IncompleteCleanupForm,
ManualBackupForm,
RetentionApplyForm,
SshCredentialGenerateForm,
ScheduleConfigForm,
SshCredentialForm,
)
from .host_ops import ensure_host_directories
from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential
from .preflight import collect_backup_gate, effective_host_config_preview, run_remote_preflight
from .retention import run_incomplete_cleanup, run_sql_retention_apply, run_sql_retention_plan
from .self_check import collect_self_checks, summarize_self_checks
from .scheduler import next_due_after
from .snapshot_discovery import discover_snapshots, inspect_snapshot_discovery
from .ssh_keys import SshKeyError, delete_generated_key_files, generate_ssh_key, merge_known_hosts, scan_known_host
from .stats_summary import collect_dashboard_stats, collect_host_stats
@staff_member_required
def dashboard(request):
global_config = GlobalConfig.objects.filter(name="default").first()
hosts = list(
HostConfig.objects.select_related("schedule")
.annotate(
snapshot_count=Count("snapshots", distinct=True),
run_count=Count("runs", distinct=True),
queued_run_count=Count("runs", filter=Q(runs__status=BackupRun.Status.QUEUED), distinct=True),
running_run_count=Count("runs", filter=Q(runs__status=BackupRun.Status.RUNNING), distinct=True),
warning_run_count=Count(
"runs",
filter=Q(runs__status=BackupRun.Status.WARNING, runs__reviewed_at__isnull=True),
distinct=True,
),
failed_run_count=Count(
"runs",
filter=Q(runs__status=BackupRun.Status.FAILED, runs__reviewed_at__isnull=True),
distinct=True,
),
)
.order_by("host")
)
for host_config in hosts:
host_config.latest_snapshot = (
host_config.snapshots.select_related("base")
.order_by("-started_at", "-discovered_at", "-id")
.first()
)
host_config.next_run_at = _next_run_for_host(host_config)
host_config.retention_warning = _retention_warning_for_host(host_config, _schedule_for_host(host_config))
stats_summary = collect_dashboard_stats(hosts=hosts, global_config=global_config)
context = {
"hosts": hosts,
"global_config": global_config,
"stats_summary": stats_summary,
"scheduler_timezone": timezone.get_current_timezone_name(),
"latest_runs": BackupRun.objects.select_related("host", "snapshot").order_by("-created_at")[:10],
"counts": {
"global_configs": GlobalConfig.objects.count(),
"hosts": HostConfig.objects.count(),
"enabled_hosts": HostConfig.objects.filter(enabled=True).count(),
"schedules": ScheduleConfig.objects.count(),
"enabled_schedules": ScheduleConfig.objects.filter(enabled=True).count(),
"snapshots": SnapshotRecord.objects.count(),
"runs": BackupRun.objects.count(),
"queued_runs": BackupRun.objects.filter(status=BackupRun.Status.QUEUED).count(),
"running_runs": BackupRun.objects.filter(status=BackupRun.Status.RUNNING).count(),
"warning_runs": BackupRun.objects.filter(
status=BackupRun.Status.WARNING,
reviewed_at__isnull=True,
).count(),
"failed_runs": BackupRun.objects.filter(
status=BackupRun.Status.FAILED,
reviewed_at__isnull=True,
).count(),
},
}
return render(request, "pobsync_backend/dashboard.html", context)
@staff_member_required
def changelog(request):
changelog_path = Path(settings.BASE_DIR) / "CHANGELOG.md"
try:
changelog_text = changelog_path.read_text(encoding="utf-8")
missing = False
except FileNotFoundError:
changelog_text = "CHANGELOG.md was not found in this installation."
missing = True
return render(
request,
"pobsync_backend/changelog.html",
{
"app_version": __version__,
"changelog_blocks": _parse_changelog(changelog_text),
"changelog_path": changelog_path,
"missing": missing,
},
)
@staff_member_required
def self_check(request):
checks = collect_self_checks()
return render(
request,
"pobsync_backend/self_check.html",
{
"checks": checks,
"summary": summarize_self_checks(checks),
},
)
@staff_member_required
def logs(request):
context = _log_context(request)
return render(request, "pobsync_backend/logs.html", context)
@staff_member_required
def ssh_credentials(request):
context = {
"credentials": SshCredential.objects.order_by("name"),
}
return render(request, "pobsync_backend/ssh_credentials.html", context)
@staff_member_required
def create_ssh_credential(request):
if request.method == "POST":
form = SshCredentialForm(request.POST, request.FILES)
if form.is_valid():
credential = form.save()
messages.success(request, f"SSH credential saved for {credential.name}.")
return redirect("ssh_credentials")
else:
form = SshCredentialForm()
return render(
request,
"pobsync_backend/ssh_credential_form.html",
{
"form": form,
"credential": None,
},
)
@staff_member_required
def generate_ssh_credential(request):
if request.method == "POST":
form = SshCredentialGenerateForm(request.POST)
if form.is_valid():
credential = SshCredential.objects.create(
name=form.cleaned_data["name"],
key_type=form.cleaned_data["key_type"],
known_hosts=form.cleaned_data["known_hosts"],
notes=form.cleaned_data["notes"],
)
try:
credential = generate_ssh_key(credential, key_type=form.cleaned_data["key_type"])
except SshKeyError as exc:
credential.delete()
form.add_error(None, str(exc))
else:
if form.cleaned_data["set_global_default"]:
global_config = GlobalConfig.objects.filter(name="default").first()
if global_config is not None:
global_config.default_ssh_credential = credential
global_config.save(update_fields=["default_ssh_credential", "updated_at"])
messages.success(request, f"SSH key generated for {credential.name}.")
return redirect("ssh_credentials")
else:
form = SshCredentialGenerateForm()
return render(
request,
"pobsync_backend/ssh_credential_generate.html",
{
"form": form,
},
)
@staff_member_required
def edit_ssh_credential(request, credential_id: int):
credential = get_object_or_404(SshCredential, id=credential_id)
if request.method == "POST":
form = SshCredentialForm(request.POST, request.FILES, instance=credential)
if form.is_valid():
saved_credential = form.save()
messages.success(request, f"SSH credential saved for {saved_credential.name}.")
return redirect("ssh_credentials")
else:
form = SshCredentialForm(instance=credential)
return render(
request,
"pobsync_backend/ssh_credential_form.html",
{
"form": form,
"credential": credential,
},
)
@staff_member_required
@require_POST
def delete_ssh_credential(request, credential_id: int):
credential = get_object_or_404(SshCredential, id=credential_id)
if credential.hosts.exists() or credential.global_configs.exists():
messages.error(request, f"SSH key {credential.name} is still in use and cannot be deleted.")
return redirect("edit_ssh_credential", credential_id=credential.id)
name = credential.name
try:
if credential.generated or credential.key_path:
delete_generated_key_files(credential)
except SshKeyError as exc:
messages.error(request, f"Could not delete SSH key files for {name}: {exc}")
return redirect("edit_ssh_credential", credential_id=credential.id)
credential.delete()
messages.success(request, f"SSH key deleted: {name}.")
return redirect("ssh_credentials")
@staff_member_required
def edit_global_config(request):
global_config = GlobalConfig.objects.filter(name="default").first()
if request.method == "POST":
form = GlobalConfigForm(request.POST, instance=global_config)
if form.is_valid():
saved_config = form.save()
messages.success(request, f"Global config saved for {saved_config.name}.")
return redirect("dashboard")
else:
form = GlobalConfigForm(instance=global_config) if global_config else GlobalConfigForm(initial=_default_global_initial())
config_checks = collect_global_config_checks(global_config) if global_config else []
return render(
request,
"pobsync_backend/global_form.html",
{
"global_config": global_config,
"form": form,
"backup_root": settings.POBSYNC_BACKUP_ROOT,
"config_checks": config_checks,
"config_check_summary": summarize_self_checks(config_checks),
},
)
@staff_member_required
def create_host_config(request):
if request.method == "POST":
form = CreateHostConfigForm(request.POST)
if form.is_valid():
host_config = form.save()
try:
host_root = ensure_host_directories(host_config)
except Exception as exc:
messages.warning(request, f"Host config created, but backup directories could not be prepared: {exc}")
else:
messages.success(request, f"Host config created for {host_config.host}; prepared {host_root}.")
return redirect("host_detail", host=host_config.host)
messages.success(request, f"Host config created for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
else:
form = CreateHostConfigForm(initial=_default_host_initial())
return render(
request,
"pobsync_backend/host_form.html",
{
"host": None,
"form": form,
},
)
@staff_member_required
def host_detail(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
global_config = GlobalConfig.objects.filter(name="default").first()
schedule = _schedule_for_host(host_config)
queued_runs = host_config.runs.filter(status=BackupRun.Status.QUEUED)
running_runs = host_config.runs.filter(status=BackupRun.Status.RUNNING)
active_run = host_config.runs.filter(
status__in=[BackupRun.Status.QUEUED, BackupRun.Status.RUNNING]
).order_by("created_at", "id").first()
has_global_config = global_config is not None
backup_gate = collect_backup_gate(host_config, global_config)
stats_summary = collect_host_stats(host=host_config, limit=10)
context = {
"host": host_config,
"schedule": schedule,
"retention_warning": _retention_warning_for_host(host_config, schedule),
"next_run_at": _next_run_for_schedule(schedule, host_config),
"scheduler_timezone": timezone.get_current_timezone_name(),
"discovery": inspect_snapshot_discovery(host=host_config),
"host_checks": backup_gate.checks,
"host_check_summary": summarize_self_checks(backup_gate.checks),
"backup_gate": backup_gate,
"last_preflight": (host_config.config or {}).get("last_preflight") if isinstance(host_config.config, dict) else {},
"effective_config": effective_host_config_preview(host_config, global_config) if global_config else {},
"stats_summary": stats_summary,
"manual_backup_form": ManualBackupForm(initial=_default_manual_backup_initial(host_config)),
"can_queue_dry_run": host_config.enabled and has_global_config and backup_gate.can_queue_dry_run and active_run is None,
"can_queue_real_backup": host_config.enabled and has_global_config and backup_gate.can_queue_real and active_run is None,
"has_global_config": has_global_config,
"active_run": active_run,
"latest_runs": host_config.runs.select_related("snapshot").order_by("-created_at")[:10],
"snapshots": host_config.snapshots.select_related("base").order_by("-started_at", "dirname")[:20],
"counts": {
"snapshots": host_config.snapshots.count(),
"runs": host_config.runs.count(),
"queued_runs": queued_runs.count(),
"running_runs": running_runs.count(),
"failed_runs": host_config.runs.filter(
status=BackupRun.Status.FAILED,
reviewed_at__isnull=True,
).count(),
"incomplete_snapshots": host_config.snapshots.filter(kind=SnapshotRecord.Kind.INCOMPLETE).count(),
},
}
return render(request, "pobsync_backend/host_detail.html", context)
@staff_member_required
@require_POST
def prepare_host_directories(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
try:
host_root = ensure_host_directories(host_config)
except Exception as exc:
messages.error(request, f"Could not prepare backup directories for {host_config.host}: {exc}")
else:
messages.success(request, f"Prepared backup directories for {host_config.host}: {host_root}")
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def scan_host_known_key(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
global_config = GlobalConfig.objects.filter(name="default").first()
credential = host_config.ssh_credential or (global_config.default_ssh_credential if global_config else None)
if credential is None:
messages.error(request, f"No SSH credential is selected for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
port = host_config.ssh_port or (global_config.ssh_port if global_config else 22)
try:
scanned = scan_known_host(host_config.address, port=int(port or 22))
except SshKeyError as exc:
messages.error(request, f"Could not scan SSH host key for {host_config.host}: {exc}")
else:
credential.known_hosts = merge_known_hosts(credential.known_hosts, scanned)
credential.save(update_fields=["known_hosts", "updated_at"])
messages.success(request, f"Stored SSH host key for {host_config.host} on credential {credential.name}.")
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def run_host_preflight(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
if not host_config.enabled:
messages.error(request, f"Cannot run preflight for disabled host {host_config.host}.")
return redirect("host_detail", host=host_config.host)
if not GlobalConfig.objects.filter(name="default").exists():
messages.error(request, "Create the default global config before running preflight.")
return redirect("host_detail", host=host_config.host)
try:
result = run_remote_preflight(host_config)
except Exception as exc:
messages.error(request, f"Connection preflight failed for {host_config.host}: {exc}")
else:
if result.get("ok"):
messages.success(request, f"Connection preflight passed for {host_config.host}.")
else:
failed = [
str(check.get("name"))
for check in result.get("checks", [])
if isinstance(check, dict) and not check.get("ok")
]
messages.error(request, f"Connection preflight failed for {host_config.host}: {', '.join(failed)}.")
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def queue_manual_backup(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
if not host_config.enabled:
messages.error(request, f"Cannot queue backup for disabled host {host_config.host}.")
return redirect("host_detail", host=host_config.host)
global_config = GlobalConfig.objects.filter(name="default").first()
if global_config is None:
messages.error(request, "Create the default global config before queueing backups.")
return redirect("host_detail", host=host_config.host)
form = ManualBackupForm(request.POST)
if not form.is_valid():
messages.error(request, "Manual backup options are invalid.")
return redirect("host_detail", host=host_config.host)
backup_gate = collect_backup_gate(host_config, global_config)
if form.cleaned_data["dry_run"]:
if not backup_gate.can_queue_dry_run:
blockers = ", ".join(check.name for check in backup_gate.dry_run_blockers)
messages.error(request, f"Cannot queue dry-run until failed preflight checks are resolved: {blockers}.")
return redirect("host_detail", host=host_config.host)
elif not backup_gate.can_queue_real:
blockers = ", ".join(check.name for check in backup_gate.real_blockers)
messages.error(request, f"Cannot queue real backup until failed preflight checks are resolved: {blockers}.")
return redirect("host_detail", host=host_config.host)
run = queue_backup_run(
host=host_config,
dry_run=form.cleaned_data["dry_run"],
verbose_output=form.cleaned_data["verbose_output"],
prune=form.cleaned_data["prune"],
prune_max_delete=form.cleaned_data["prune_max_delete"],
prune_protect_bases=form.cleaned_data["prune_protect_bases"],
)
messages.success(request, f"Queued manual backup run {run.id} for {host_config.host}.")
return redirect("run_detail", run_id=run.id)
@staff_member_required
def run_detail(request, run_id: int):
run = get_object_or_404(BackupRun.objects.select_related("host", "snapshot"), id=run_id)
result = run.result if isinstance(run.result, dict) else {}
run_stats = result.get("stats") if isinstance(result.get("stats"), dict) else {}
rsync_result = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
failure = result.get("failure") if isinstance(result.get("failure"), dict) else {}
prune_result = result.get("prune") if isinstance(result.get("prune"), dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
rsync_log_path = _run_rsync_log_path(run)
rsync_log_tail = _run_rsync_log_tail(rsync_result, rsync_log_path)
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
context = {
"run": run,
"can_cancel": run.status in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING},
"requested": requested,
"execution": execution,
"stats": run_stats if isinstance(run_stats, dict) else {},
"rsync": rsync_result,
"rsync_command": _run_rsync_command(rsync_result),
"failure": failure,
"failure_summary": failure.get("message") or failure.get("summary") or "",
"prune_result": prune_result,
"has_prune_result": bool(prune_result),
"retention_warning": _retention_warning_for_host(run.host, _schedule_for_host(run.host)),
"rsync_log_path": str(rsync_log_path) if rsync_log_path is not None else "",
"rsync_log_exists": bool(rsync_log_path and rsync_log_path.exists()),
"rsync_log_tail": rsync_log_tail,
"dry_run_summary": _dry_run_summary(
result=result,
requested=requested,
stats=run_stats if isinstance(run_stats, dict) else {},
failure=failure,
rsync_log_tail=rsync_log_tail,
rsync_log_exists=bool(rsync_log_path and rsync_log_path.exists()),
),
"result_json": _pretty_json(run.result),
}
return render(request, "pobsync_backend/run_detail.html", context)
@staff_member_required
def run_rsync_log(request, run_id: int):
run = get_object_or_404(BackupRun.objects.select_related("host", "snapshot"), id=run_id)
log_path = _run_rsync_log_path(run)
if log_path is None or not log_path.is_file():
raise Http404("Rsync log not found")
return FileResponse(log_path.open("rb"), content_type="text/plain; charset=utf-8")
@staff_member_required
@require_POST
def cancel_run(request, run_id: int):
run = get_object_or_404(BackupRun.objects.select_related("host"), id=run_id)
if run.status not in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}:
messages.warning(request, f"Run {run.id} is already {run.status}.")
return redirect("run_detail", run_id=run.id)
result = dict(run.result) if isinstance(run.result, dict) else {}
result["cancellation"] = {
"requested_at": timezone.now().isoformat(),
"previous_status": run.status,
}
update_fields = ["status", "result"]
run.status = BackupRun.Status.CANCELLED
run.result = result
if result["cancellation"]["previous_status"] == BackupRun.Status.QUEUED:
run.ended_at = timezone.now()
update_fields.append("ended_at")
run.save(update_fields=update_fields)
messages.success(request, f"Cancellation requested for run {run.id}.")
return redirect("run_detail", run_id=run.id)
@staff_member_required
@require_POST
def resolve_run_review(request, run_id: int):
run = get_object_or_404(BackupRun.objects.select_related("host"), id=run_id)
if run.status not in {BackupRun.Status.FAILED, BackupRun.Status.WARNING}:
messages.warning(request, f"Run {run.id} does not need review.")
return redirect("run_detail", run_id=run.id)
if run.reviewed_at:
messages.info(request, f"Run {run.id} was already marked reviewed.")
return redirect("run_detail", run_id=run.id)
run.reviewed_at = timezone.now()
run.reviewed_by = request.user.get_username()
run.save(update_fields=["reviewed_at", "reviewed_by"])
messages.success(request, f"Run {run.id} marked reviewed.")
return redirect("run_detail", run_id=run.id)
@staff_member_required
@require_POST
def resolve_host_incomplete_reviews(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
reviewed_count = host_config.snapshots.filter(
kind=SnapshotRecord.Kind.INCOMPLETE,
reviewed_at__isnull=True,
).update(reviewed_at=timezone.now(), reviewed_by=request.user.get_username())
if reviewed_count:
messages.success(request, f"Marked {reviewed_count} incomplete snapshot(s) reviewed for {host_config.host}.")
else:
messages.info(request, f"No incomplete snapshots needed review for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
@staff_member_required
def snapshot_detail(request, snapshot_id: int):
snapshot = get_object_or_404(
SnapshotRecord.objects.select_related("host", "base").prefetch_related("derived_snapshots", "backup_runs"),
id=snapshot_id,
)
restore = _snapshot_restore_guidance(snapshot)
context = {
"snapshot": snapshot,
"stats": snapshot.metadata.get("stats") if isinstance(snapshot.metadata, dict) else {},
"metadata_json": _pretty_json(snapshot.metadata),
"backup_runs": snapshot.backup_runs.select_related("host").order_by("-created_at"),
"derived_snapshots": snapshot.derived_snapshots.select_related("host").order_by("-started_at", "dirname"),
"restore": restore,
}
return render(request, "pobsync_backend/snapshot_detail.html", context)
@staff_member_required
@require_POST
def discover_host_snapshots(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
try:
result = discover_snapshots(host=host_config)
except Exception as exc:
messages.error(request, f"Snapshot discovery failed for {host_config.host}: {exc}")
else:
summary = (
f"Snapshot discovery scanned {result['scanned']} items for {host_config.host}: "
f"{result['created']} created, {result['updated']} updated."
)
if result["scanned"]:
messages.success(request, summary)
else:
discovery = inspect_snapshot_discovery(host=host_config)
messages.warning(request, f"{summary} {discovery['message']}")
return redirect("host_detail", host=host_config.host)
@staff_member_required
def host_retention_plan(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
kind = request.GET.get("kind", "scheduled")
if kind not in {"scheduled", "manual", "all"}:
messages.error(request, "Retention kind must be scheduled, manual, or all.")
return redirect("host_detail", host=host_config.host)
protect_bases = request.GET.get("protect_bases") in {"1", "true", "on", "yes"}
try:
plan = run_sql_retention_plan(host=host_config.host, kind=kind, protect_bases=protect_bases)
except PobsyncError as exc:
messages.error(request, str(exc))
return redirect("host_detail", host=host_config.host)
schedule = _schedule_for_host(host_config)
scheduled_prune_limit = schedule.prune_max_delete if schedule and schedule.prune else None
delete_count = len(plan["delete"])
incomplete_count = len(plan["incomplete"])
context = {
"host": host_config,
"kind": kind,
"protect_bases": protect_bases,
"plan": plan,
"schedule": schedule,
"scheduled_prune_limit": scheduled_prune_limit,
"scheduled_prune_exceeded": scheduled_prune_limit is not None and delete_count > scheduled_prune_limit,
"apply_form": RetentionApplyForm(
host_name=host_config.host,
expected_delete_count=delete_count,
initial={
"kind": kind,
"protect_bases": protect_bases,
"max_delete": delete_count,
"confirm_delete_count": delete_count,
},
),
"incomplete_cleanup_form": IncompleteCleanupForm(
host_name=host_config.host,
expected_delete_count=incomplete_count,
initial={
"max_delete": incomplete_count,
"confirm_delete_count": incomplete_count,
},
),
}
return render(request, "pobsync_backend/retention_plan.html", context)
@staff_member_required
@require_POST
def apply_host_retention(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
raw_kind = request.POST.get("kind", "scheduled")
raw_protect_bases = request.POST.get("protect_bases") in {"1", "true", "on", "yes"}
expected_delete_count = None
if raw_kind in {"scheduled", "manual", "all"}:
try:
plan = run_sql_retention_plan(
host=host_config.host,
kind=raw_kind,
protect_bases=raw_protect_bases,
)
except PobsyncError as exc:
messages.error(request, str(exc))
return redirect("host_retention_plan", host=host_config.host)
expected_delete_count = len(plan.get("delete") or [])
form = RetentionApplyForm(
request.POST,
host_name=host_config.host,
expected_delete_count=expected_delete_count,
)
if not form.is_valid():
messages.error(request, "Retention apply confirmation is invalid.")
return redirect("host_retention_plan", host=host_config.host)
kind = form.cleaned_data["kind"]
protect_bases = bool(form.cleaned_data["protect_bases"])
try:
result = run_sql_retention_apply(
prefix=Path(settings.POBSYNC_HOME),
host=host_config.host,
kind=kind,
protect_bases=protect_bases,
yes=True,
max_delete=form.cleaned_data["max_delete"],
)
except PobsyncError as exc:
messages.error(request, str(exc))
else:
messages.success(request, f"Retention deleted {len(result['deleted'])} snapshot(s) for {host_config.host}.")
target = redirect("host_retention_plan", host=host_config.host)
query = f"kind={kind}"
if protect_bases:
query += "&protect_bases=1"
target["Location"] = f"{target['Location']}?{query}"
return target
@staff_member_required
@require_POST
def cleanup_host_incomplete_snapshots(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
try:
plan = run_sql_retention_plan(host=host_config.host, kind="all", protect_bases=True)
except PobsyncError as exc:
messages.error(request, str(exc))
return redirect("host_retention_plan", host=host_config.host)
incomplete_count = len(plan.get("incomplete") or [])
form = IncompleteCleanupForm(
request.POST,
host_name=host_config.host,
expected_delete_count=incomplete_count,
)
if not form.is_valid():
messages.error(request, "Incomplete cleanup confirmation is invalid.")
return redirect("host_retention_plan", host=host_config.host)
try:
result = run_incomplete_cleanup(
prefix=Path(settings.POBSYNC_HOME),
host=host_config.host,
yes=True,
max_delete=form.cleaned_data["max_delete"],
)
except PobsyncError as exc:
messages.error(request, str(exc))
else:
messages.success(request, f"Deleted {len(result['deleted'])} incomplete snapshot(s) for {host_config.host}.")
return redirect("host_retention_plan", host=host_config.host)
@staff_member_required
def edit_host_config(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
global_config = GlobalConfig.objects.filter(name="default").first()
config_checks = collect_effective_host_config_checks(host_config, global_config) if global_config else []
if request.method == "POST":
form = HostConfigForm(request.POST, instance=host_config)
if form.is_valid():
form.save()
messages.success(request, f"Host config saved for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
else:
form = HostConfigForm(instance=host_config)
return render(
request,
"pobsync_backend/host_form.html",
{
"host": host_config,
"form": form,
"config_checks": config_checks,
"config_check_summary": summarize_self_checks(config_checks),
},
)
@staff_member_required
def edit_host_schedule(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
schedule = _schedule_for_host(host_config)
if request.method == "POST":
form = ScheduleConfigForm(request.POST, instance=schedule)
if form.is_valid():
saved_schedule = form.save(commit=False)
saved_schedule.host = host_config
saved_schedule.save()
messages.success(request, f"Schedule saved for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
else:
form = (
ScheduleConfigForm(instance=schedule)
if schedule
else ScheduleConfigForm(initial=_default_schedule_initial())
)
return render(
request,
"pobsync_backend/schedule_form.html",
{
"host": host_config,
"schedule": schedule,
"form": form,
},
)
def _schedule_for_host(host_config: HostConfig) -> ScheduleConfig | None:
try:
return host_config.schedule
except ScheduleConfig.DoesNotExist:
return None
def _next_run_for_host(host_config: HostConfig):
return _next_run_for_schedule(_schedule_for_host(host_config), host_config)
def _next_run_for_schedule(schedule: ScheduleConfig | None, host_config: HostConfig):
if schedule is None or not schedule.enabled or not host_config.enabled:
return None
try:
return next_due_after(schedule.cron_expr, timezone.localtime(timezone.now()))
except ValueError:
return None
def _retention_warning_for_host(host_config: HostConfig, schedule: ScheduleConfig | None) -> dict[str, object]:
incomplete_count = host_config.snapshots.filter(
kind=SnapshotRecord.Kind.INCOMPLETE,
reviewed_at__isnull=True,
).count()
warning: dict[str, object] = {
"has_warning": incomplete_count > 0,
"incomplete_count": incomplete_count,
}
if schedule is None or not schedule.prune or not host_config.enabled:
return warning
try:
plan = run_sql_retention_plan(
host=host_config.host,
kind="scheduled",
protect_bases=bool(schedule.prune_protect_bases),
)
except PobsyncError as exc:
warning.update(
{
"has_warning": True,
"error": str(exc),
}
)
return warning
delete_count = len(plan.get("delete") or [])
warning.update(
{
"delete_count": delete_count,
"max_delete": schedule.prune_max_delete,
"protect_bases": bool(schedule.prune_protect_bases),
"prune_exceeded": delete_count > schedule.prune_max_delete,
}
)
if warning["prune_exceeded"]:
warning["has_warning"] = True
return warning
def _default_schedule_initial() -> dict[str, object]:
return {
"cron_expr": "15 2 * * *",
"enabled": True,
"prune_max_delete": 10,
}
def _default_global_initial() -> dict[str, object]:
return {
"name": "default",
"default_ssh_credential": SshCredential.objects.order_by("name").first(),
"ssh_user": "root",
"ssh_port": 22,
"rsync_binary": "rsync",
"rsync_args": DEFAULT_RSYNC_ARGS,
"default_source_root": "/",
"excludes_default": DEFAULT_EXCLUDES,
"retention_daily": 14,
"retention_weekly": 8,
"retention_monthly": 12,
"retention_yearly": 0,
}
def _default_host_initial() -> dict[str, object]:
global_config = GlobalConfig.objects.filter(name="default").first()
if global_config is not None:
return {
"enabled": True,
"ssh_credential": global_config.default_ssh_credential,
"ssh_user": global_config.ssh_user,
"ssh_port": global_config.ssh_port,
"source_root": global_config.default_source_root,
"retention_daily": global_config.retention_daily,
"retention_weekly": global_config.retention_weekly,
"retention_monthly": global_config.retention_monthly,
"retention_yearly": global_config.retention_yearly,
}
return {
"enabled": True,
"retention_daily": 14,
"retention_weekly": 8,
"retention_monthly": 12,
"retention_yearly": 0,
}
def _default_manual_backup_initial(host_config: HostConfig) -> dict[str, object]:
schedule = _schedule_for_host(host_config)
return {
"dry_run": True,
"prune": bool(schedule.prune) if schedule else False,
"prune_max_delete": schedule.prune_max_delete if schedule else 10,
"prune_protect_bases": bool(schedule.prune_protect_bases) if schedule else False,
}
def _pretty_json(value: object) -> str:
return json.dumps(value or {}, indent=2, sort_keys=True)
def _parse_changelog(text: str) -> list[dict[str, object]]:
blocks: list[dict[str, object]] = []
paragraph: list[str] = []
list_items: list[str] = []
def flush_paragraph() -> None:
if paragraph:
blocks.append({"kind": "paragraph", "text": " ".join(paragraph)})
paragraph.clear()
def flush_list() -> None:
if list_items:
blocks.append({"kind": "list", "items": list(list_items)})
list_items.clear()
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
flush_paragraph()
flush_list()
continue
if line.startswith("#"):
flush_paragraph()
flush_list()
marker, _space, heading = line.partition(" ")
level = min(max(len(marker), 1), 3)
blocks.append({"kind": "heading", "level": level, "text": heading.strip() or line.lstrip("#").strip()})
continue
if line.startswith("- "):
flush_paragraph()
list_items.append(line[2:].strip())
continue
flush_list()
paragraph.append(line)
flush_paragraph()
flush_list()
return blocks
def _snapshot_restore_guidance(snapshot: SnapshotRecord) -> dict[str, str]:
source_path = Path(snapshot.path) / "data"
destination_path = Path("/restore") / snapshot.host.host
example_relative_path = Path("etc") / "nginx"
example_file_relative_path = Path("home") / "example" / "site" / "public_html" / "index.php"
quoted_source = _quote_path_with_trailing_slash(source_path)
quoted_destination = _quote_path_with_trailing_slash(destination_path)
quoted_partial_source = _quote_path_with_trailing_slash(source_path / example_relative_path)
quoted_partial_destination = _quote_path_with_trailing_slash(destination_path / example_relative_path)
quoted_file_source = shlex.quote(str(source_path / example_file_relative_path))
quoted_file_destination = shlex.quote(str(destination_path / example_file_relative_path))
quoted_remote_destination = shlex.quote(f"root@{snapshot.host.address or snapshot.host.host}:/")
common_args = "rsync -aHAX --numeric-ids --info=progress2"
return {
"source_path": str(source_path),
"destination_path": str(destination_path),
"example_relative_path": str(example_relative_path),
"example_file_relative_path": str(example_file_relative_path),
"inspect_command": f"ls -la {quoted_source}",
"dry_run_command": f"{common_args} --dry-run {quoted_source} {quoted_destination}",
"local_command": f"{common_args} {quoted_source} {quoted_destination}",
"partial_dry_run_command": f"{common_args} --dry-run {quoted_partial_source} {quoted_partial_destination}",
"file_dry_run_command": f"{common_args} --dry-run {quoted_file_source} {quoted_file_destination}",
"remote_dry_run_command": f"{common_args} --dry-run {quoted_source} {quoted_remote_destination}",
}
def _quote_path_with_trailing_slash(path: Path) -> str:
return shlex.quote(str(path).rstrip("/") + "/")
def _run_rsync_log_path(run: BackupRun) -> Path | None:
if isinstance(run.result, dict):
log = run.result.get("log")
if isinstance(log, str) and log:
return Path(log)
execution = run.result.get("execution")
if isinstance(execution, dict):
execution_log = execution.get("log")
if isinstance(execution_log, str) and execution_log:
return Path(execution_log)
if run.snapshot_path:
return Path(run.snapshot_path) / "meta" / "rsync.log"
return None
def _run_rsync_command(rsync_result: dict) -> list[str]:
command = rsync_result.get("command")
if not isinstance(command, list):
return []
return [str(part) for part in command]
def _run_rsync_log_tail(rsync_result: dict, log_path: Path | None, *, max_lines: int = 30) -> list[str]:
log_tail = rsync_result.get("log_tail")
if isinstance(log_tail, list):
return [str(line) for line in log_tail[-max_lines:]]
if log_path is None or not log_path.is_file():
return []
try:
with log_path.open("r", encoding="utf-8", errors="replace") as handle:
return handle.read().splitlines()[-max_lines:]
except OSError:
return []
def _dry_run_summary(
*,
result: dict,
requested: dict,
stats: dict,
failure: dict,
rsync_log_tail: list[str],
rsync_log_exists: bool,
) -> dict[str, object]:
if not (result.get("dry_run") or requested.get("dry_run")):
return {}
rsync_stats = stats.get("rsync") if isinstance(stats.get("rsync"), dict) else {}
warnings = []
if failure:
message = failure.get("message") or failure.get("summary")
hint = failure.get("hint")
if message:
warnings.append(str(message))
if hint:
warnings.append(str(hint))
for line in rsync_log_tail:
lowered = line.lower()
if "warning" in lowered or "permission denied" in lowered or "failed" in lowered:
warnings.append(line)
return {
"ok": result.get("ok"),
"status": "passed" if result.get("ok") else ("failed" if result.get("ok") is False else "running"),
"highlight_class": "success" if result.get("ok") else ("failed" if result.get("ok") is False else "warning"),
"files_seen": rsync_stats.get("files_total"),
"files_would_transfer": rsync_stats.get("files_transferred"),
"total_file_size_bytes": rsync_stats.get("total_file_size_bytes"),
"transfer_estimate_bytes": rsync_stats.get("total_transferred_file_size_bytes")
or rsync_stats.get("literal_data_bytes"),
"link_dest_estimated_savings_bytes": rsync_stats.get("link_dest_estimated_savings_bytes"),
"duration_seconds": stats.get("duration_seconds"),
"log_available": rsync_log_exists,
"warnings": list(dict.fromkeys(warnings)),
}
def _log_context(request) -> dict[str, object]:
units = ("pobsync-web.service", "pobsync-worker.service", "pobsync-scheduler.service")
priorities = {
"": "All",
"0..3": "Errors",
"4": "Warnings",
"5": "Notices",
"6": "Info",
"7": "Debug",
}
time_windows = {
"1h": "Last hour",
"6h": "Last 6 hours",
"24h": "Last 24 hours",
"7d": "Last 7 days",
"": "All available",
}
since_values = {
"1h": "1 hour ago",
"6h": "6 hours ago",
"24h": "24 hours ago",
"7d": "7 days ago",
}
selected_unit = request.GET.get("unit", "")
priority = request.GET.get("priority", "0..4")
time_window = request.GET.get("window", "24h")
if time_window not in time_windows:
time_window = "24h"
host_filter = request.GET.get("host", "").strip()
run_filter = request.GET.get("run", "").strip()
query = request.GET.get("q", "").strip()
lines = []
error = ""
if shutil.which("journalctl") is None:
error = "journalctl is not available in this runtime."
else:
command = ["journalctl", "--no-pager", "-n", "300", "-o", "short-iso"]
if time_window:
command.extend(["--since", since_values[time_window]])
if selected_unit in units:
command.extend(["-u", selected_unit])
else:
for unit in units:
command.extend(["-u", unit])
if priority:
command.extend(["-p", priority])
result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10)
if result.returncode != 0:
error = result.stderr.strip() or "Could not read journal logs."
else:
lines = result.stdout.splitlines()
lines = _filter_log_lines(lines, query=query, host=host_filter, run_id=run_filter)
return {
"units": units,
"priorities": priorities,
"time_windows": time_windows,
"selected_unit": selected_unit,
"selected_priority": priority,
"selected_window": time_window,
"host_filter": host_filter,
"run_filter": run_filter,
"query": query,
"lines": lines,
"error": error,
}
def _filter_log_lines(lines: list[str], *, query: str, host: str, run_id: str) -> list[str]:
filters = []
if query:
filters.append(lambda line: query.lower() in line.lower())
if host:
filters.append(lambda line: host.lower() in line.lower())
if run_id:
run_tokens = (
f"run {run_id}",
f"run={run_id}",
f"run_id={run_id}",
f"run-{run_id}",
f"#{run_id}",
)
filters.append(lambda line: any(token in line.lower() for token in run_tokens))
if not filters:
return lines
return [line for line in lines if all(matches(line) for matches in filters)]