from __future__ import annotations import json import shlex import shutil import subprocess from pathlib import Path from django.contrib import messages from django.contrib.admin.views.decorators import staff_member_required from django.conf import settings from django.http import FileResponse, Http404 from django.db.models import Count, Q from django.shortcuts import get_object_or_404, redirect, render from django.utils import timezone from django.views.decorators.http import require_POST from pobsync import __version__ from pobsync.errors import PobsyncError from pobsync.config.defaults import DEFAULT_EXCLUDES, DEFAULT_RSYNC_ARGS from .backup_runner import queue_backup_run from .config_checks import collect_effective_host_config_checks, collect_global_config_checks from .forms import ( CreateHostConfigForm, GlobalConfigForm, HostConfigForm, IncompleteCleanupForm, ManualBackupForm, RetentionApplyForm, SshCredentialGenerateForm, ScheduleConfigForm, SshCredentialForm, ) from .host_ops import ensure_host_directories from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential from .preflight import collect_backup_gate, effective_host_config_preview, run_remote_preflight from .retention import run_incomplete_cleanup, run_sql_retention_apply, run_sql_retention_plan from .self_check import collect_self_checks, summarize_self_checks from .scheduler import next_due_after from .snapshot_discovery import discover_snapshots, inspect_snapshot_discovery from .ssh_keys import SshKeyError, delete_generated_key_files, generate_ssh_key, merge_known_hosts, scan_known_host from .stats_summary import collect_dashboard_stats, collect_host_stats @staff_member_required def dashboard(request): global_config = GlobalConfig.objects.filter(name="default").first() hosts = list( HostConfig.objects.select_related("schedule") .annotate( snapshot_count=Count("snapshots", distinct=True), run_count=Count("runs", distinct=True), queued_run_count=Count("runs", filter=Q(runs__status=BackupRun.Status.QUEUED), distinct=True), running_run_count=Count("runs", filter=Q(runs__status=BackupRun.Status.RUNNING), distinct=True), warning_run_count=Count("runs", filter=Q(runs__status=BackupRun.Status.WARNING), distinct=True), failed_run_count=Count("runs", filter=Q(runs__status=BackupRun.Status.FAILED), distinct=True), ) .order_by("host") ) for host_config in hosts: host_config.latest_snapshot = ( host_config.snapshots.select_related("base") .order_by("-started_at", "-discovered_at", "-id") .first() ) host_config.next_run_at = _next_run_for_host(host_config) host_config.retention_warning = _retention_warning_for_host(host_config, _schedule_for_host(host_config)) stats_summary = collect_dashboard_stats(hosts=hosts, global_config=global_config) context = { "hosts": hosts, "global_config": global_config, "stats_summary": stats_summary, "scheduler_timezone": timezone.get_current_timezone_name(), "latest_runs": BackupRun.objects.select_related("host", "snapshot").order_by("-created_at")[:10], "counts": { "global_configs": GlobalConfig.objects.count(), "hosts": HostConfig.objects.count(), "enabled_hosts": HostConfig.objects.filter(enabled=True).count(), "schedules": ScheduleConfig.objects.count(), "enabled_schedules": ScheduleConfig.objects.filter(enabled=True).count(), "snapshots": SnapshotRecord.objects.count(), "runs": BackupRun.objects.count(), "queued_runs": BackupRun.objects.filter(status=BackupRun.Status.QUEUED).count(), "running_runs": BackupRun.objects.filter(status=BackupRun.Status.RUNNING).count(), "warning_runs": BackupRun.objects.filter(status=BackupRun.Status.WARNING).count(), "failed_runs": BackupRun.objects.filter(status=BackupRun.Status.FAILED).count(), }, } return render(request, "pobsync_backend/dashboard.html", context) @staff_member_required def changelog(request): changelog_path = Path(settings.BASE_DIR) / "CHANGELOG.md" try: changelog_text = changelog_path.read_text(encoding="utf-8") missing = False except FileNotFoundError: changelog_text = "CHANGELOG.md was not found in this installation." missing = True return render( request, "pobsync_backend/changelog.html", { "app_version": __version__, "changelog_blocks": _parse_changelog(changelog_text), "changelog_path": changelog_path, "missing": missing, }, ) @staff_member_required def self_check(request): checks = collect_self_checks() return render( request, "pobsync_backend/self_check.html", { "checks": checks, "summary": summarize_self_checks(checks), }, ) @staff_member_required def logs(request): context = _log_context(request) return render(request, "pobsync_backend/logs.html", context) @staff_member_required def ssh_credentials(request): context = { "credentials": SshCredential.objects.order_by("name"), } return render(request, "pobsync_backend/ssh_credentials.html", context) @staff_member_required def create_ssh_credential(request): if request.method == "POST": form = SshCredentialForm(request.POST, request.FILES) if form.is_valid(): credential = form.save() messages.success(request, f"SSH credential saved for {credential.name}.") return redirect("ssh_credentials") else: form = SshCredentialForm() return render( request, "pobsync_backend/ssh_credential_form.html", { "form": form, "credential": None, }, ) @staff_member_required def generate_ssh_credential(request): if request.method == "POST": form = SshCredentialGenerateForm(request.POST) if form.is_valid(): credential = SshCredential.objects.create( name=form.cleaned_data["name"], key_type=form.cleaned_data["key_type"], known_hosts=form.cleaned_data["known_hosts"], notes=form.cleaned_data["notes"], ) try: credential = generate_ssh_key(credential, key_type=form.cleaned_data["key_type"]) except SshKeyError as exc: credential.delete() form.add_error(None, str(exc)) else: if form.cleaned_data["set_global_default"]: global_config = GlobalConfig.objects.filter(name="default").first() if global_config is not None: global_config.default_ssh_credential = credential global_config.save(update_fields=["default_ssh_credential", "updated_at"]) messages.success(request, f"SSH key generated for {credential.name}.") return redirect("ssh_credentials") else: form = SshCredentialGenerateForm() return render( request, "pobsync_backend/ssh_credential_generate.html", { "form": form, }, ) @staff_member_required def edit_ssh_credential(request, credential_id: int): credential = get_object_or_404(SshCredential, id=credential_id) if request.method == "POST": form = SshCredentialForm(request.POST, request.FILES, instance=credential) if form.is_valid(): saved_credential = form.save() messages.success(request, f"SSH credential saved for {saved_credential.name}.") return redirect("ssh_credentials") else: form = SshCredentialForm(instance=credential) return render( request, "pobsync_backend/ssh_credential_form.html", { "form": form, "credential": credential, }, ) @staff_member_required @require_POST def delete_ssh_credential(request, credential_id: int): credential = get_object_or_404(SshCredential, id=credential_id) if credential.hosts.exists() or credential.global_configs.exists(): messages.error(request, f"SSH key {credential.name} is still in use and cannot be deleted.") return redirect("edit_ssh_credential", credential_id=credential.id) name = credential.name try: if credential.generated or credential.key_path: delete_generated_key_files(credential) except SshKeyError as exc: messages.error(request, f"Could not delete SSH key files for {name}: {exc}") return redirect("edit_ssh_credential", credential_id=credential.id) credential.delete() messages.success(request, f"SSH key deleted: {name}.") return redirect("ssh_credentials") @staff_member_required def edit_global_config(request): global_config = GlobalConfig.objects.filter(name="default").first() if request.method == "POST": form = GlobalConfigForm(request.POST, instance=global_config) if form.is_valid(): saved_config = form.save() messages.success(request, f"Global config saved for {saved_config.name}.") return redirect("dashboard") else: form = GlobalConfigForm(instance=global_config) if global_config else GlobalConfigForm(initial=_default_global_initial()) config_checks = collect_global_config_checks(global_config) if global_config else [] return render( request, "pobsync_backend/global_form.html", { "global_config": global_config, "form": form, "backup_root": settings.POBSYNC_BACKUP_ROOT, "config_checks": config_checks, "config_check_summary": summarize_self_checks(config_checks), }, ) @staff_member_required def create_host_config(request): if request.method == "POST": form = CreateHostConfigForm(request.POST) if form.is_valid(): host_config = form.save() try: host_root = ensure_host_directories(host_config) except Exception as exc: messages.warning(request, f"Host config created, but backup directories could not be prepared: {exc}") else: messages.success(request, f"Host config created for {host_config.host}; prepared {host_root}.") return redirect("host_detail", host=host_config.host) messages.success(request, f"Host config created for {host_config.host}.") return redirect("host_detail", host=host_config.host) else: form = CreateHostConfigForm(initial=_default_host_initial()) return render( request, "pobsync_backend/host_form.html", { "host": None, "form": form, }, ) @staff_member_required def host_detail(request, host: str): host_config = get_object_or_404(HostConfig, host=host) global_config = GlobalConfig.objects.filter(name="default").first() schedule = _schedule_for_host(host_config) queued_runs = host_config.runs.filter(status=BackupRun.Status.QUEUED) running_runs = host_config.runs.filter(status=BackupRun.Status.RUNNING) active_run = host_config.runs.filter( status__in=[BackupRun.Status.QUEUED, BackupRun.Status.RUNNING] ).order_by("created_at", "id").first() has_global_config = global_config is not None backup_gate = collect_backup_gate(host_config, global_config) stats_summary = collect_host_stats(host=host_config, limit=10) context = { "host": host_config, "schedule": schedule, "retention_warning": _retention_warning_for_host(host_config, schedule), "next_run_at": _next_run_for_schedule(schedule, host_config), "scheduler_timezone": timezone.get_current_timezone_name(), "discovery": inspect_snapshot_discovery(host=host_config), "host_checks": backup_gate.checks, "host_check_summary": summarize_self_checks(backup_gate.checks), "backup_gate": backup_gate, "last_preflight": (host_config.config or {}).get("last_preflight") if isinstance(host_config.config, dict) else {}, "effective_config": effective_host_config_preview(host_config, global_config) if global_config else {}, "stats_summary": stats_summary, "manual_backup_form": ManualBackupForm(initial=_default_manual_backup_initial(host_config)), "can_queue_dry_run": host_config.enabled and has_global_config and backup_gate.can_queue_dry_run and active_run is None, "can_queue_real_backup": host_config.enabled and has_global_config and backup_gate.can_queue_real and active_run is None, "has_global_config": has_global_config, "active_run": active_run, "latest_runs": host_config.runs.select_related("snapshot").order_by("-created_at")[:10], "snapshots": host_config.snapshots.select_related("base").order_by("-started_at", "dirname")[:20], "counts": { "snapshots": host_config.snapshots.count(), "runs": host_config.runs.count(), "queued_runs": queued_runs.count(), "running_runs": running_runs.count(), "failed_runs": host_config.runs.filter(status=BackupRun.Status.FAILED).count(), "incomplete_snapshots": host_config.snapshots.filter(kind=SnapshotRecord.Kind.INCOMPLETE).count(), }, } return render(request, "pobsync_backend/host_detail.html", context) @staff_member_required @require_POST def prepare_host_directories(request, host: str): host_config = get_object_or_404(HostConfig, host=host) try: host_root = ensure_host_directories(host_config) except Exception as exc: messages.error(request, f"Could not prepare backup directories for {host_config.host}: {exc}") else: messages.success(request, f"Prepared backup directories for {host_config.host}: {host_root}") return redirect("host_detail", host=host_config.host) @staff_member_required @require_POST def scan_host_known_key(request, host: str): host_config = get_object_or_404(HostConfig, host=host) global_config = GlobalConfig.objects.filter(name="default").first() credential = host_config.ssh_credential or (global_config.default_ssh_credential if global_config else None) if credential is None: messages.error(request, f"No SSH credential is selected for {host_config.host}.") return redirect("host_detail", host=host_config.host) port = host_config.ssh_port or (global_config.ssh_port if global_config else 22) try: scanned = scan_known_host(host_config.address, port=int(port or 22)) except SshKeyError as exc: messages.error(request, f"Could not scan SSH host key for {host_config.host}: {exc}") else: credential.known_hosts = merge_known_hosts(credential.known_hosts, scanned) credential.save(update_fields=["known_hosts", "updated_at"]) messages.success(request, f"Stored SSH host key for {host_config.host} on credential {credential.name}.") return redirect("host_detail", host=host_config.host) @staff_member_required @require_POST def run_host_preflight(request, host: str): host_config = get_object_or_404(HostConfig, host=host) if not host_config.enabled: messages.error(request, f"Cannot run preflight for disabled host {host_config.host}.") return redirect("host_detail", host=host_config.host) if not GlobalConfig.objects.filter(name="default").exists(): messages.error(request, "Create the default global config before running preflight.") return redirect("host_detail", host=host_config.host) try: result = run_remote_preflight(host_config) except Exception as exc: messages.error(request, f"Connection preflight failed for {host_config.host}: {exc}") else: if result.get("ok"): messages.success(request, f"Connection preflight passed for {host_config.host}.") else: failed = [ str(check.get("name")) for check in result.get("checks", []) if isinstance(check, dict) and not check.get("ok") ] messages.error(request, f"Connection preflight failed for {host_config.host}: {', '.join(failed)}.") return redirect("host_detail", host=host_config.host) @staff_member_required @require_POST def queue_manual_backup(request, host: str): host_config = get_object_or_404(HostConfig, host=host) if not host_config.enabled: messages.error(request, f"Cannot queue backup for disabled host {host_config.host}.") return redirect("host_detail", host=host_config.host) global_config = GlobalConfig.objects.filter(name="default").first() if global_config is None: messages.error(request, "Create the default global config before queueing backups.") return redirect("host_detail", host=host_config.host) form = ManualBackupForm(request.POST) if not form.is_valid(): messages.error(request, "Manual backup options are invalid.") return redirect("host_detail", host=host_config.host) backup_gate = collect_backup_gate(host_config, global_config) if form.cleaned_data["dry_run"]: if not backup_gate.can_queue_dry_run: blockers = ", ".join(check.name for check in backup_gate.dry_run_blockers) messages.error(request, f"Cannot queue dry-run until failed preflight checks are resolved: {blockers}.") return redirect("host_detail", host=host_config.host) elif not backup_gate.can_queue_real: blockers = ", ".join(check.name for check in backup_gate.real_blockers) messages.error(request, f"Cannot queue real backup until failed preflight checks are resolved: {blockers}.") return redirect("host_detail", host=host_config.host) run = queue_backup_run( host=host_config, dry_run=form.cleaned_data["dry_run"], verbose_output=form.cleaned_data["verbose_output"], prune=form.cleaned_data["prune"], prune_max_delete=form.cleaned_data["prune_max_delete"], prune_protect_bases=form.cleaned_data["prune_protect_bases"], ) messages.success(request, f"Queued manual backup run {run.id} for {host_config.host}.") return redirect("run_detail", run_id=run.id) @staff_member_required def run_detail(request, run_id: int): run = get_object_or_404(BackupRun.objects.select_related("host", "snapshot"), id=run_id) result = run.result if isinstance(run.result, dict) else {} run_stats = result.get("stats") if isinstance(result.get("stats"), dict) else {} rsync_result = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} failure = result.get("failure") if isinstance(result.get("failure"), dict) else {} prune_result = result.get("prune") if isinstance(result.get("prune"), dict) else {} execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} rsync_log_path = _run_rsync_log_path(run) rsync_log_tail = _run_rsync_log_tail(rsync_result, rsync_log_path) requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} context = { "run": run, "can_cancel": run.status in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}, "requested": requested, "execution": execution, "stats": run_stats if isinstance(run_stats, dict) else {}, "rsync": rsync_result, "rsync_command": _run_rsync_command(rsync_result), "failure": failure, "failure_summary": failure.get("message") or failure.get("summary") or "", "prune_result": prune_result, "has_prune_result": bool(prune_result), "retention_warning": _retention_warning_for_host(run.host, _schedule_for_host(run.host)), "rsync_log_path": str(rsync_log_path) if rsync_log_path is not None else "", "rsync_log_exists": bool(rsync_log_path and rsync_log_path.exists()), "rsync_log_tail": rsync_log_tail, "dry_run_summary": _dry_run_summary( result=result, requested=requested, stats=run_stats if isinstance(run_stats, dict) else {}, failure=failure, rsync_log_tail=rsync_log_tail, rsync_log_exists=bool(rsync_log_path and rsync_log_path.exists()), ), "result_json": _pretty_json(run.result), } return render(request, "pobsync_backend/run_detail.html", context) @staff_member_required def run_rsync_log(request, run_id: int): run = get_object_or_404(BackupRun.objects.select_related("host", "snapshot"), id=run_id) log_path = _run_rsync_log_path(run) if log_path is None or not log_path.is_file(): raise Http404("Rsync log not found") return FileResponse(log_path.open("rb"), content_type="text/plain; charset=utf-8") @staff_member_required @require_POST def cancel_run(request, run_id: int): run = get_object_or_404(BackupRun.objects.select_related("host"), id=run_id) if run.status not in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}: messages.warning(request, f"Run {run.id} is already {run.status}.") return redirect("run_detail", run_id=run.id) result = dict(run.result) if isinstance(run.result, dict) else {} result["cancellation"] = { "requested_at": timezone.now().isoformat(), "previous_status": run.status, } update_fields = ["status", "result"] run.status = BackupRun.Status.CANCELLED run.result = result if result["cancellation"]["previous_status"] == BackupRun.Status.QUEUED: run.ended_at = timezone.now() update_fields.append("ended_at") run.save(update_fields=update_fields) messages.success(request, f"Cancellation requested for run {run.id}.") return redirect("run_detail", run_id=run.id) @staff_member_required def snapshot_detail(request, snapshot_id: int): snapshot = get_object_or_404( SnapshotRecord.objects.select_related("host", "base").prefetch_related("derived_snapshots", "backup_runs"), id=snapshot_id, ) restore = _snapshot_restore_guidance(snapshot) context = { "snapshot": snapshot, "stats": snapshot.metadata.get("stats") if isinstance(snapshot.metadata, dict) else {}, "metadata_json": _pretty_json(snapshot.metadata), "backup_runs": snapshot.backup_runs.select_related("host").order_by("-created_at"), "derived_snapshots": snapshot.derived_snapshots.select_related("host").order_by("-started_at", "dirname"), "restore": restore, } return render(request, "pobsync_backend/snapshot_detail.html", context) @staff_member_required @require_POST def discover_host_snapshots(request, host: str): host_config = get_object_or_404(HostConfig, host=host) try: result = discover_snapshots(host=host_config) except Exception as exc: messages.error(request, f"Snapshot discovery failed for {host_config.host}: {exc}") else: summary = ( f"Snapshot discovery scanned {result['scanned']} items for {host_config.host}: " f"{result['created']} created, {result['updated']} updated." ) if result["scanned"]: messages.success(request, summary) else: discovery = inspect_snapshot_discovery(host=host_config) messages.warning(request, f"{summary} {discovery['message']}") return redirect("host_detail", host=host_config.host) @staff_member_required def host_retention_plan(request, host: str): host_config = get_object_or_404(HostConfig, host=host) kind = request.GET.get("kind", "scheduled") if kind not in {"scheduled", "manual", "all"}: messages.error(request, "Retention kind must be scheduled, manual, or all.") return redirect("host_detail", host=host_config.host) protect_bases = request.GET.get("protect_bases") in {"1", "true", "on", "yes"} try: plan = run_sql_retention_plan(host=host_config.host, kind=kind, protect_bases=protect_bases) except PobsyncError as exc: messages.error(request, str(exc)) return redirect("host_detail", host=host_config.host) schedule = _schedule_for_host(host_config) scheduled_prune_limit = schedule.prune_max_delete if schedule and schedule.prune else None delete_count = len(plan["delete"]) incomplete_count = len(plan["incomplete"]) context = { "host": host_config, "kind": kind, "protect_bases": protect_bases, "plan": plan, "schedule": schedule, "scheduled_prune_limit": scheduled_prune_limit, "scheduled_prune_exceeded": scheduled_prune_limit is not None and delete_count > scheduled_prune_limit, "apply_form": RetentionApplyForm( host_name=host_config.host, expected_delete_count=delete_count, initial={ "kind": kind, "protect_bases": protect_bases, "max_delete": delete_count, "confirm_delete_count": delete_count, }, ), "incomplete_cleanup_form": IncompleteCleanupForm( host_name=host_config.host, expected_delete_count=incomplete_count, initial={ "max_delete": incomplete_count, "confirm_delete_count": incomplete_count, }, ), } return render(request, "pobsync_backend/retention_plan.html", context) @staff_member_required @require_POST def apply_host_retention(request, host: str): host_config = get_object_or_404(HostConfig, host=host) raw_kind = request.POST.get("kind", "scheduled") raw_protect_bases = request.POST.get("protect_bases") in {"1", "true", "on", "yes"} expected_delete_count = None if raw_kind in {"scheduled", "manual", "all"}: try: plan = run_sql_retention_plan( host=host_config.host, kind=raw_kind, protect_bases=raw_protect_bases, ) except PobsyncError as exc: messages.error(request, str(exc)) return redirect("host_retention_plan", host=host_config.host) expected_delete_count = len(plan.get("delete") or []) form = RetentionApplyForm( request.POST, host_name=host_config.host, expected_delete_count=expected_delete_count, ) if not form.is_valid(): messages.error(request, "Retention apply confirmation is invalid.") return redirect("host_retention_plan", host=host_config.host) kind = form.cleaned_data["kind"] protect_bases = bool(form.cleaned_data["protect_bases"]) try: result = run_sql_retention_apply( prefix=Path(settings.POBSYNC_HOME), host=host_config.host, kind=kind, protect_bases=protect_bases, yes=True, max_delete=form.cleaned_data["max_delete"], ) except PobsyncError as exc: messages.error(request, str(exc)) else: messages.success(request, f"Retention deleted {len(result['deleted'])} snapshot(s) for {host_config.host}.") target = redirect("host_retention_plan", host=host_config.host) query = f"kind={kind}" if protect_bases: query += "&protect_bases=1" target["Location"] = f"{target['Location']}?{query}" return target @staff_member_required @require_POST def cleanup_host_incomplete_snapshots(request, host: str): host_config = get_object_or_404(HostConfig, host=host) try: plan = run_sql_retention_plan(host=host_config.host, kind="all", protect_bases=True) except PobsyncError as exc: messages.error(request, str(exc)) return redirect("host_retention_plan", host=host_config.host) incomplete_count = len(plan.get("incomplete") or []) form = IncompleteCleanupForm( request.POST, host_name=host_config.host, expected_delete_count=incomplete_count, ) if not form.is_valid(): messages.error(request, "Incomplete cleanup confirmation is invalid.") return redirect("host_retention_plan", host=host_config.host) try: result = run_incomplete_cleanup( prefix=Path(settings.POBSYNC_HOME), host=host_config.host, yes=True, max_delete=form.cleaned_data["max_delete"], ) except PobsyncError as exc: messages.error(request, str(exc)) else: messages.success(request, f"Deleted {len(result['deleted'])} incomplete snapshot(s) for {host_config.host}.") return redirect("host_retention_plan", host=host_config.host) @staff_member_required def edit_host_config(request, host: str): host_config = get_object_or_404(HostConfig, host=host) global_config = GlobalConfig.objects.filter(name="default").first() config_checks = collect_effective_host_config_checks(host_config, global_config) if global_config else [] if request.method == "POST": form = HostConfigForm(request.POST, instance=host_config) if form.is_valid(): form.save() messages.success(request, f"Host config saved for {host_config.host}.") return redirect("host_detail", host=host_config.host) else: form = HostConfigForm(instance=host_config) return render( request, "pobsync_backend/host_form.html", { "host": host_config, "form": form, "config_checks": config_checks, "config_check_summary": summarize_self_checks(config_checks), }, ) @staff_member_required def edit_host_schedule(request, host: str): host_config = get_object_or_404(HostConfig, host=host) schedule = _schedule_for_host(host_config) if request.method == "POST": form = ScheduleConfigForm(request.POST, instance=schedule) if form.is_valid(): saved_schedule = form.save(commit=False) saved_schedule.host = host_config saved_schedule.save() messages.success(request, f"Schedule saved for {host_config.host}.") return redirect("host_detail", host=host_config.host) else: form = ( ScheduleConfigForm(instance=schedule) if schedule else ScheduleConfigForm(initial=_default_schedule_initial()) ) return render( request, "pobsync_backend/schedule_form.html", { "host": host_config, "schedule": schedule, "form": form, }, ) def _schedule_for_host(host_config: HostConfig) -> ScheduleConfig | None: try: return host_config.schedule except ScheduleConfig.DoesNotExist: return None def _next_run_for_host(host_config: HostConfig): return _next_run_for_schedule(_schedule_for_host(host_config), host_config) def _next_run_for_schedule(schedule: ScheduleConfig | None, host_config: HostConfig): if schedule is None or not schedule.enabled or not host_config.enabled: return None try: return next_due_after(schedule.cron_expr, timezone.localtime(timezone.now())) except ValueError: return None def _retention_warning_for_host(host_config: HostConfig, schedule: ScheduleConfig | None) -> dict[str, object]: incomplete_count = host_config.snapshots.filter(kind=SnapshotRecord.Kind.INCOMPLETE).count() warning: dict[str, object] = { "has_warning": incomplete_count > 0, "incomplete_count": incomplete_count, } if schedule is None or not schedule.prune or not host_config.enabled: return warning try: plan = run_sql_retention_plan( host=host_config.host, kind="scheduled", protect_bases=bool(schedule.prune_protect_bases), ) except PobsyncError as exc: warning.update( { "has_warning": True, "error": str(exc), } ) return warning delete_count = len(plan.get("delete") or []) warning.update( { "delete_count": delete_count, "max_delete": schedule.prune_max_delete, "protect_bases": bool(schedule.prune_protect_bases), "prune_exceeded": delete_count > schedule.prune_max_delete, } ) if warning["prune_exceeded"]: warning["has_warning"] = True return warning def _default_schedule_initial() -> dict[str, object]: return { "cron_expr": "15 2 * * *", "enabled": True, "prune_max_delete": 10, } def _default_global_initial() -> dict[str, object]: return { "name": "default", "default_ssh_credential": SshCredential.objects.order_by("name").first(), "ssh_user": "root", "ssh_port": 22, "rsync_binary": "rsync", "rsync_args": DEFAULT_RSYNC_ARGS, "default_source_root": "/", "excludes_default": DEFAULT_EXCLUDES, "retention_daily": 14, "retention_weekly": 8, "retention_monthly": 12, "retention_yearly": 0, } def _default_host_initial() -> dict[str, object]: global_config = GlobalConfig.objects.filter(name="default").first() if global_config is not None: return { "enabled": True, "ssh_credential": global_config.default_ssh_credential, "ssh_user": global_config.ssh_user, "ssh_port": global_config.ssh_port, "source_root": global_config.default_source_root, "retention_daily": global_config.retention_daily, "retention_weekly": global_config.retention_weekly, "retention_monthly": global_config.retention_monthly, "retention_yearly": global_config.retention_yearly, } return { "enabled": True, "retention_daily": 14, "retention_weekly": 8, "retention_monthly": 12, "retention_yearly": 0, } def _default_manual_backup_initial(host_config: HostConfig) -> dict[str, object]: schedule = _schedule_for_host(host_config) return { "dry_run": True, "prune": bool(schedule.prune) if schedule else False, "prune_max_delete": schedule.prune_max_delete if schedule else 10, "prune_protect_bases": bool(schedule.prune_protect_bases) if schedule else False, } def _pretty_json(value: object) -> str: return json.dumps(value or {}, indent=2, sort_keys=True) def _parse_changelog(text: str) -> list[dict[str, object]]: blocks: list[dict[str, object]] = [] paragraph: list[str] = [] list_items: list[str] = [] def flush_paragraph() -> None: if paragraph: blocks.append({"kind": "paragraph", "text": " ".join(paragraph)}) paragraph.clear() def flush_list() -> None: if list_items: blocks.append({"kind": "list", "items": list(list_items)}) list_items.clear() for raw_line in text.splitlines(): line = raw_line.strip() if not line: flush_paragraph() flush_list() continue if line.startswith("#"): flush_paragraph() flush_list() marker, _space, heading = line.partition(" ") level = min(max(len(marker), 1), 3) blocks.append({"kind": "heading", "level": level, "text": heading.strip() or line.lstrip("#").strip()}) continue if line.startswith("- "): flush_paragraph() list_items.append(line[2:].strip()) continue flush_list() paragraph.append(line) flush_paragraph() flush_list() return blocks def _snapshot_restore_guidance(snapshot: SnapshotRecord) -> dict[str, str]: source_path = Path(snapshot.path) / "data" destination_path = Path("/restore") / snapshot.host.host example_relative_path = Path("etc") / "nginx" example_file_relative_path = Path("home") / "example" / "site" / "public_html" / "index.php" quoted_source = _quote_path_with_trailing_slash(source_path) quoted_destination = _quote_path_with_trailing_slash(destination_path) quoted_partial_source = _quote_path_with_trailing_slash(source_path / example_relative_path) quoted_partial_destination = _quote_path_with_trailing_slash(destination_path / example_relative_path) quoted_file_source = shlex.quote(str(source_path / example_file_relative_path)) quoted_file_destination = shlex.quote(str(destination_path / example_file_relative_path)) quoted_remote_destination = shlex.quote(f"root@{snapshot.host.address or snapshot.host.host}:/") common_args = "rsync -aHAX --numeric-ids --info=progress2" return { "source_path": str(source_path), "destination_path": str(destination_path), "example_relative_path": str(example_relative_path), "example_file_relative_path": str(example_file_relative_path), "inspect_command": f"ls -la {quoted_source}", "dry_run_command": f"{common_args} --dry-run {quoted_source} {quoted_destination}", "local_command": f"{common_args} {quoted_source} {quoted_destination}", "partial_dry_run_command": f"{common_args} --dry-run {quoted_partial_source} {quoted_partial_destination}", "file_dry_run_command": f"{common_args} --dry-run {quoted_file_source} {quoted_file_destination}", "remote_dry_run_command": f"{common_args} --dry-run {quoted_source} {quoted_remote_destination}", } def _quote_path_with_trailing_slash(path: Path) -> str: return shlex.quote(str(path).rstrip("/") + "/") def _run_rsync_log_path(run: BackupRun) -> Path | None: if isinstance(run.result, dict): log = run.result.get("log") if isinstance(log, str) and log: return Path(log) execution = run.result.get("execution") if isinstance(execution, dict): execution_log = execution.get("log") if isinstance(execution_log, str) and execution_log: return Path(execution_log) if run.snapshot_path: return Path(run.snapshot_path) / "meta" / "rsync.log" return None def _run_rsync_command(rsync_result: dict) -> list[str]: command = rsync_result.get("command") if not isinstance(command, list): return [] return [str(part) for part in command] def _run_rsync_log_tail(rsync_result: dict, log_path: Path | None, *, max_lines: int = 30) -> list[str]: log_tail = rsync_result.get("log_tail") if isinstance(log_tail, list): return [str(line) for line in log_tail[-max_lines:]] if log_path is None or not log_path.is_file(): return [] try: with log_path.open("r", encoding="utf-8", errors="replace") as handle: return handle.read().splitlines()[-max_lines:] except OSError: return [] def _dry_run_summary( *, result: dict, requested: dict, stats: dict, failure: dict, rsync_log_tail: list[str], rsync_log_exists: bool, ) -> dict[str, object]: if not (result.get("dry_run") or requested.get("dry_run")): return {} rsync_stats = stats.get("rsync") if isinstance(stats.get("rsync"), dict) else {} warnings = [] if failure: message = failure.get("message") or failure.get("summary") hint = failure.get("hint") if message: warnings.append(str(message)) if hint: warnings.append(str(hint)) for line in rsync_log_tail: lowered = line.lower() if "warning" in lowered or "permission denied" in lowered or "failed" in lowered: warnings.append(line) return { "ok": result.get("ok"), "status": "passed" if result.get("ok") else ("failed" if result.get("ok") is False else "running"), "highlight_class": "success" if result.get("ok") else ("failed" if result.get("ok") is False else "warning"), "files_seen": rsync_stats.get("files_total"), "files_would_transfer": rsync_stats.get("files_transferred"), "total_file_size_bytes": rsync_stats.get("total_file_size_bytes"), "transfer_estimate_bytes": rsync_stats.get("total_transferred_file_size_bytes") or rsync_stats.get("literal_data_bytes"), "link_dest_estimated_savings_bytes": rsync_stats.get("link_dest_estimated_savings_bytes"), "duration_seconds": stats.get("duration_seconds"), "log_available": rsync_log_exists, "warnings": list(dict.fromkeys(warnings)), } def _log_context(request) -> dict[str, object]: units = ("pobsync-web.service", "pobsync-worker.service", "pobsync-scheduler.service") priorities = { "": "All", "0..3": "Errors", "4": "Warnings", "5": "Notices", "6": "Info", "7": "Debug", } time_windows = { "1h": "Last hour", "6h": "Last 6 hours", "24h": "Last 24 hours", "7d": "Last 7 days", "": "All available", } since_values = { "1h": "1 hour ago", "6h": "6 hours ago", "24h": "24 hours ago", "7d": "7 days ago", } selected_unit = request.GET.get("unit", "") priority = request.GET.get("priority", "0..4") time_window = request.GET.get("window", "24h") if time_window not in time_windows: time_window = "24h" host_filter = request.GET.get("host", "").strip() run_filter = request.GET.get("run", "").strip() query = request.GET.get("q", "").strip() lines = [] error = "" if shutil.which("journalctl") is None: error = "journalctl is not available in this runtime." else: command = ["journalctl", "--no-pager", "-n", "300", "-o", "short-iso"] if time_window: command.extend(["--since", since_values[time_window]]) if selected_unit in units: command.extend(["-u", selected_unit]) else: for unit in units: command.extend(["-u", unit]) if priority: command.extend(["-p", priority]) result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10) if result.returncode != 0: error = result.stderr.strip() or "Could not read journal logs." else: lines = result.stdout.splitlines() lines = _filter_log_lines(lines, query=query, host=host_filter, run_id=run_filter) return { "units": units, "priorities": priorities, "time_windows": time_windows, "selected_unit": selected_unit, "selected_priority": priority, "selected_window": time_window, "host_filter": host_filter, "run_filter": run_filter, "query": query, "lines": lines, "error": error, } def _filter_log_lines(lines: list[str], *, query: str, host: str, run_id: str) -> list[str]: filters = [] if query: filters.append(lambda line: query.lower() in line.lower()) if host: filters.append(lambda line: host.lower() in line.lower()) if run_id: run_tokens = ( f"run {run_id}", f"run={run_id}", f"run_id={run_id}", f"run-{run_id}", f"#{run_id}", ) filters.append(lambda line: any(token in line.lower() for token in run_tokens)) if not filters: return lines return [line for line in lines if all(matches(line) for matches in filters)]