from __future__ import annotations import subprocess from datetime import datetime, timezone from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch from django.contrib.auth import get_user_model from django.core.files.uploadedfile import SimpleUploadedFile from django.template.defaultfilters import filesizeformat from django.test import TestCase, override_settings from django.urls import reverse from pobsync.run_stats import tree_usage from pobsync.util import write_yaml_atomic from pobsync_backend.models import ( BackupRun, GlobalConfig, HostConfig, NotificationDelivery, NotificationTarget, PurgedSnapshot, ScheduleConfig, SnapshotRecord, SshCredential, ) class ViewTests(TestCase): def setUp(self) -> None: user_model = get_user_model() self.staff_user = user_model.objects.create_user( username="admin", password="secret", is_staff=True, is_superuser=True, ) def test_dashboard_requires_staff_login(self) -> None: response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 302) self.assertIn("/admin/login/", response["Location"]) def test_base_navigation_groups_primary_and_system_links(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, 'aria-label="Primary navigation"', html=False) self.assertContains(response, 'aria-label="System navigation"', html=False) self.assertContains(response, reverse("dashboard")) self.assertContains(response, reverse("hosts_list")) self.assertContains(response, reverse("ssh_credentials")) self.assertContains(response, reverse("notification_targets")) self.assertContains(response, reverse("logs")) self.assertContains(response, reverse("purged_snapshots")) self.assertContains(response, reverse("self_check")) self.assertContains(response, reverse("changelog")) self.assertContains(response, "/api/status/") self.assertContains(response, reverse("admin:index")) self.assertContains(response, 'Dashboard', html=False) def test_base_navigation_marks_current_secondary_page(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("self_check")) self.assertEqual(response.status_code, 200) self.assertContains(response, f'Self Check', html=False) def test_changelog_requires_staff_login(self) -> None: response = self.client.get(reverse("changelog")) self.assertEqual(response.status_code, 302) self.assertIn("/admin/login/", response["Location"]) def test_changelog_renders_repository_changelog(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: changelog = Path(tmp) / "CHANGELOG.md" changelog.write_text( "# Changelog\n\n## 1.0.0 - 2026-05-21\n\n- Django control panel\n- Native systemd installer\n", encoding="utf-8", ) with override_settings(BASE_DIR=Path(tmp)): response = self.client.get(reverse("changelog")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Installed version:") self.assertContains(response, "Changelog file:") self.assertNotContains(response, "Source:") self.assertContains(response, "1.0.0 - 2026-05-21") self.assertContains(response, "Django control panel") self.assertContains(response, "Native systemd installer") def test_notification_targets_view_renders_targets_and_deliveries(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS) target = NotificationTarget.objects.create( name="ops", channel=NotificationTarget.Channel.EMAIL, email_to="ops@example.test", last_status=NotificationDelivery.Status.SENT, ) NotificationDelivery.objects.create(target=target, run=run, status=NotificationDelivery.Status.SENT) response = self.client.get(reverse("notification_targets")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Notifications") self.assertContains(response, "ops") self.assertContains(response, "ops@example.test") self.assertContains(response, f"Run {run.id}") def test_notification_target_form_creates_email_target(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_notification_target"), { "name": "ops", "enabled": "on", "channel": NotificationTarget.Channel.EMAIL, "statuses": [BackupRun.Status.FAILED, BackupRun.Status.WARNING], "email_to": "ops@example.test, backup@example.test", "webhook_headers": "{}", "notes": "Notify ops", }, follow=True, ) self.assertRedirects(response, reverse("notification_targets")) self.assertContains(response, "Notification target ops created.") target = NotificationTarget.objects.get(name="ops") self.assertEqual(target.channel, NotificationTarget.Channel.EMAIL) self.assertEqual(target.statuses, [BackupRun.Status.FAILED, BackupRun.Status.WARNING]) self.assertEqual(target.email_to, "ops@example.test\nbackup@example.test") def test_notification_target_form_requires_channel_destination(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_notification_target"), { "name": "broken", "enabled": "on", "channel": NotificationTarget.Channel.WEBHOOK, "statuses": [BackupRun.Status.FAILED], "email_to": "", "webhook_url": "", "webhook_headers": "{}", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Webhook targets need a URL.") self.assertFalse(NotificationTarget.objects.exists()) def test_dashboard_renders_hosts_and_latest_runs(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), ) warning_run = BackupRun.objects.create( host=host, run_type=BackupRun.RunType.SCHEDULED, status=BackupRun.Status.WARNING, started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), result={ "ok": True, "prune": { "ok": False, "error": "Retention warning", }, }, ) BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.FAILED, started_at=datetime(2026, 5, 19, 1, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Control panel") self.assertContains(response, "Backup health, required action, storage pressure, and recent activity in one place.") self.assertContains(response, "dashboard-panel-required") self.assertContains(response, "dashboard-panel-schedules") self.assertContains(response, "dashboard-panel-activity") self.assertContains(response, "dashboard-panel-storage") self.assertContains(response, "dashboard-summary-grid") self.assertContains(response, "dashboard-trends-panel") self.assertContains(response, "dashboard-hosts-panel") self.assertContains(response, "Dashboard") self.assertContains(response, "web-01") self.assertContains(response, "20260519-021500Z__ABCDEFGH") self.assertContains(response, "success") self.assertContains(response, "Last Good Backup") self.assertContains(response, "Latest Issue") self.assertContains(response, f"Run {run.id}") self.assertContains(response, f"Run {warning_run.id}") self.assertContains(response, "warning") self.assertContains(response, "manual") self.assertContains(response, "scheduled") self.assertContains(response, "Backup activity") self.assertContains(response, "Snapshot health") self.assertContains(response, "queued 1") self.assertContains(response, "running 1") self.assertContains(response, "warning 1") self.assertContains(response, "failed 1") self.assertContains(response, "Required Action") self.assertContains(response, "Failed runs") self.assertContains(response, "1 failed run(s) need review.") self.assertContains(response, "1 run(s) completed with warnings.") self.assertContains(response, "1 backup run in progress.") self.assertContains(response, "1 backup run waiting.") self.assertContains(response, "Next Scheduled Work") self.assertContains(response, "Recent Activity") self.assertContains(response, f'data-refresh-url="{reverse("dashboard_priority_live")}"', html=False) self.assertContains(response, f'data-refresh-url="{reverse("dashboard_hosts_live")}"', html=False) self.assertContains(response, f'href="{reverse("runs_list")}"', html=False) self.assertContains(response, f'href="{reverse("runs_list")}?status=queued"', html=False) self.assertContains(response, f'href="{reverse("runs_list")}?status=running"', html=False) self.assertContains(response, f'href="{reverse("runs_list")}?status=warning&review=needed"', html=False) self.assertContains(response, f'href="{reverse("runs_list")}?status=failed&review=needed"', html=False) self.assertContains( response, f'href="{reverse("runs_list")}?host=web-01&status=failed&review=needed"', html=False, ) self.assertContains(response, f'href="{reverse("snapshots_list")}"', html=False) self.assertContains(response, f'href="{reverse("schedules_list")}"', html=False) def test_dashboard_priority_live_returns_status_partial(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.get(reverse("dashboard_priority_live")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Required Action") self.assertContains(response, "Recent Activity") self.assertContains(response, "running") self.assertNotContains(response, " None: self.client.force_login(self.staff_user) web = HostConfig.objects.create(host="web-01", address="web-01.example.test") db = HostConfig.objects.create(host="db-01", address="db-01.example.test") scheduled = self._snapshot(web, "20260519-021500Z__SCHED01", kind=SnapshotRecord.Kind.SCHEDULED) manual = self._snapshot(web, "20260519-031500Z__MANUAL1", kind=SnapshotRecord.Kind.MANUAL) self._set_snapshot_storage(scheduled, allocated=100) self._set_snapshot_storage(manual, allocated=200) with TemporaryDirectory() as tmp: incomplete_dir = Path(tmp) / db.host / ".incomplete" / "20260519-041500Z__BROKEN1" data_dir = incomplete_dir / "data" data_dir.mkdir(parents=True) data_dir.joinpath("partial-file").write_text("interrupted backup data\n", encoding="utf-8") expected_usage = tree_usage(data_dir) SnapshotRecord.objects.create( host=db, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=incomplete_dir.name, path=str(incomplete_dir), status="failed", ) response = self.client.get(reverse("dashboard_priority_live")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Scheduled data") self.assertContains(response, "Manual data") self.assertContains(response, "Incomplete data") self.assertContains(response, "Total snapshot data") self.assertContains(response, "100 bytes", html=True) self.assertContains(response, "200 bytes", html=True) self.assertContains(response, filesizeformat(expected_usage["allocated_size_bytes"])) self.assertContains(response, filesizeformat(300 + expected_usage["allocated_size_bytes"])) def test_dashboard_hosts_live_returns_hosts_partial(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.get(reverse("dashboard_hosts_live")) self.assertEqual(response.status_code, 200) self.assertContains(response, "web-01") self.assertContains(response, "queued 1") self.assertContains(response, "Snapshot health") self.assertNotContains(response, " None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") scheduled = self._snapshot(host, "20260519-021500Z__SCHED01", kind=SnapshotRecord.Kind.SCHEDULED) manual = self._snapshot(host, "20260519-031500Z__MANUAL1", kind=SnapshotRecord.Kind.MANUAL) self._set_snapshot_storage(scheduled, allocated=100) self._set_snapshot_storage(manual, allocated=200) with TemporaryDirectory() as tmp: incomplete_dir = Path(tmp) / host.host / ".incomplete" / "20260519-041500Z__BROKEN1" data_dir = incomplete_dir / "data" data_dir.mkdir(parents=True) data_dir.joinpath("partial-file").write_text("interrupted backup data\n", encoding="utf-8") expected_usage = tree_usage(data_dir) SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=incomplete_dir.name, path=str(incomplete_dir), status="failed", ) response = self.client.get(reverse("dashboard_hosts_live")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Scheduled data") self.assertContains(response, "Manual data") self.assertContains(response, "Incomplete data") self.assertContains(response, "Total data") self.assertContains(response, "100 bytes", html=True) self.assertContains(response, "200 bytes", html=True) self.assertContains(response, filesizeformat(expected_usage["allocated_size_bytes"])) self.assertContains(response, filesizeformat(300 + expected_usage["allocated_size_bytes"])) def test_dashboard_host_cards_measure_incomplete_data_without_snapshot_metadata(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: incomplete_dir = Path(tmp) / host.host / ".incomplete" / "20260519-041500Z__BROKEN1" data_dir = incomplete_dir / "data" data_dir.mkdir(parents=True) data_dir.joinpath("partial-file").write_text("interrupted backup data\n", encoding="utf-8") expected_usage = tree_usage(data_dir) SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=incomplete_dir.name, path=str(incomplete_dir), status="failed", metadata={}, ) response = self.client.get(reverse("dashboard_hosts_live")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Incomplete data") self.assertContains(response, filesizeformat(expected_usage["allocated_size_bytes"])) def test_hosts_list_renders_host_cards_and_controls(self) -> None: self.client.force_login(self.staff_user) web = HostConfig.objects.create(host="web-01", address="web-01.example.test") db = HostConfig.objects.create(host="db-01", address="db-01.example.test", enabled=False) ScheduleConfig.objects.create(host=web, cron_expr="15 2 * * *", enabled=True, prune=True) BackupRun.objects.create(host=web, status=BackupRun.Status.RUNNING) response = self.client.get(reverse("hosts_list")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Inventory") self.assertContains(response, "Configured backup targets") self.assertContains(response, "web-01") self.assertContains(response, "db-01") self.assertContains(response, "running 1") self.assertContains(response, "schedule on") self.assertContains(response, "retention on") self.assertContains(response, "Disable host") self.assertContains(response, "Enable host") self.assertContains(response, "Pause schedule") self.assertContains(response, "Pause retention") self.assertContains(response, reverse("update_host_state", args=[web.host])) self.assertContains(response, reverse("edit_host_config", args=[web.host])) self.assertContains(response, reverse("edit_host_schedule", args=[web.host])) def test_hosts_list_filters_by_enabled_state(self) -> None: self.client.force_login(self.staff_user) HostConfig.objects.create(host="web-01", address="web-01.example.test") HostConfig.objects.create(host="db-01", address="db-01.example.test", enabled=False) response = self.client.get(reverse("hosts_list"), {"enabled": "no"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "db-01") self.assertNotContains(response, "web-01") def test_update_host_state_toggles_host_schedule_and_retention(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") schedule = ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True) response = self.client.post( reverse("update_host_state", args=[host.host]), {"action": "disable_host", "next": reverse("hosts_list")}, follow=True, ) self.assertRedirects(response, reverse("hosts_list")) host.refresh_from_db() self.assertFalse(host.enabled) self.assertContains(response, "Disabled host web-01.") self.client.post(reverse("update_host_state", args=[host.host]), {"action": "disable_schedule"}) self.client.post(reverse("update_host_state", args=[host.host]), {"action": "disable_prune"}) schedule.refresh_from_db() self.assertFalse(schedule.enabled) self.assertFalse(schedule.prune) self.client.post(reverse("update_host_state", args=[host.host]), {"action": "enable_host"}) self.client.post(reverse("update_host_state", args=[host.host]), {"action": "enable_schedule"}) self.client.post(reverse("update_host_state", args=[host.host]), {"action": "enable_prune"}) host.refresh_from_db() schedule.refresh_from_db() self.assertTrue(host.enabled) self.assertTrue(schedule.enabled) self.assertTrue(schedule.prune) def test_dashboard_renders_backup_trend_summary(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/missing-backup-root") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 30, "rsync": { "files_total": 100, "literal_data_bytes": 1000, "matched_data_bytes": 4000, }, "storage": { "capacity": { "available_bytes": 10_000, "used_percent": 25.0, } }, }, }, ) ScheduleConfig.objects.create(host=host, cron_expr="* * * * *", enabled=True) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Trends") self.assertContains(response, "Storage Pressure") self.assertContains(response, "Backup root used") self.assertContains(response, "Runway") self.assertContains(response, "New Data") self.assertContains(response, "Link-Dest Savings") self.assertContains(response, "80.0%") self.assertContains(response, "10 days") self.assertContains(response, "Warnings") self.assertContains(response, "Next Run") self.assertContains(response, "UTC") self.assertContains(response, "10") self.assertContains(response, f"Run {run.id}") self.assertContains(response, "manual") self.assertContains(response, "1000") def test_dashboard_explains_missing_backup_trends(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups") HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Trends") self.assertContains(response, "No completed backup runs with stats yet.") self.assertContains(response, "growth estimates") def test_dashboard_shows_all_clear_operational_status(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups") HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Required Action") self.assertContains(response, "No queued, running, unreviewed warning/failed runs, or retention warnings.") def test_runs_list_filters_by_status_and_review(self) -> None: self.client.force_login(self.staff_user) web = HostConfig.objects.create(host="web-01", address="web-01.example.test") db = HostConfig.objects.create(host="db-01", address="db-01.example.test") failed = BackupRun.objects.create(host=web, status=BackupRun.Status.FAILED, run_type=BackupRun.RunType.MANUAL) success = BackupRun.objects.create(host=db, status=BackupRun.Status.SUCCESS, run_type=BackupRun.RunType.SCHEDULED) BackupRun.objects.create( host=web, status=BackupRun.Status.WARNING, reviewed_at=datetime(2026, 5, 19, 4, 15, tzinfo=timezone.utc), reviewed_by="admin", ) response = self.client.get(reverse("runs_list"), {"status": "failed", "review": "needed"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Runs") self.assertContains(response, "Review queued, running, completed") self.assertContains(response, "Apply filters") self.assertContains(response, reverse("runs_list")) self.assertContains(response, "Clear") self.assertContains(response, f"Run {failed.id}") self.assertContains(response, "web-01") self.assertContains(response, "needed") self.assertNotContains(response, f"Run {success.id}") def test_runs_list_can_mark_problem_run_reviewed(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.FAILED, run_type=BackupRun.RunType.MANUAL) list_url = f'{reverse("runs_list")}?status=failed&review=needed' response = self.client.get(list_url) self.assertEqual(response.status_code, 200) self.assertContains(response, "Mark reviewed") self.assertContains(response, 'value="/runs/?status=failed&review=needed"', html=False) response = self.client.post( reverse("resolve_run_review", args=[run.id]), {"next": list_url}, follow=True, ) run.refresh_from_db() self.assertIsNotNone(run.reviewed_at) self.assertEqual(run.reviewed_by, self.staff_user.username) self.assertRedirects(response, list_url) self.assertContains(response, f"Run {run.id} marked reviewed.") self.assertNotContains(response, f"Run {run.id}", html=False) def test_snapshots_list_filters_by_host_and_kind(self) -> None: self.client.force_login(self.staff_user) web = HostConfig.objects.create(host="web-01", address="web-01.example.test") db = HostConfig.objects.create(host="db-01", address="db-01.example.test") manual = self._snapshot(web, "20260519-021500Z__MANUAL01", kind=SnapshotRecord.Kind.MANUAL) scheduled = self._snapshot(db, "20260519-021500Z__SCHED01", kind=SnapshotRecord.Kind.SCHEDULED) response = self.client.get(reverse("snapshots_list"), {"host": web.host, "kind": SnapshotRecord.Kind.MANUAL}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Snapshots") self.assertContains(response, "Browse discovered scheduled, manual, and incomplete snapshots") self.assertContains(response, "Apply filters") self.assertContains(response, reverse("snapshots_list")) self.assertContains(response, "Clear") self.assertContains(response, manual.dirname) self.assertContains(response, "web-01") self.assertNotContains(response, scheduled.dirname) def test_schedules_list_filters_by_enabled_and_prune(self) -> None: self.client.force_login(self.staff_user) web = HostConfig.objects.create(host="web-01", address="web-01.example.test") db = HostConfig.objects.create(host="db-01", address="db-01.example.test") ScheduleConfig.objects.create(host=web, cron_expr="15 2 * * *", enabled=True, prune=True, last_status="success") ScheduleConfig.objects.create(host=db, cron_expr="30 3 * * *", enabled=False, prune=False, last_status="failed") response = self.client.get(reverse("schedules_list"), {"enabled": "yes", "prune": "yes"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Schedules") self.assertContains(response, "Review configured backup schedules") self.assertContains(response, "Apply filters") self.assertContains(response, reverse("schedules_list")) self.assertContains(response, "Clear") self.assertContains(response, "web-01") self.assertContains(response, "15 2 * * *") self.assertContains(response, "success") self.assertContains(response, "UTC") self.assertNotContains(response, "30 3 * * *") def test_dashboard_surfaces_retention_warnings(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Scheduled prune would delete 2 snapshot(s), above max 1.") self.assertContains(response, "1 incomplete snapshot(s) need review.") self.assertContains(response, "Mark reviewed") def test_dashboard_ignores_reviewed_problem_runs(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") BackupRun.objects.create( host=host, status=BackupRun.Status.FAILED, reviewed_at=datetime(2026, 5, 19, 4, 15, tzinfo=timezone.utc), reviewed_by="admin", ) BackupRun.objects.create( host=host, status=BackupRun.Status.WARNING, reviewed_at=datetime(2026, 5, 19, 4, 20, tzinfo=timezone.utc), reviewed_by="admin", ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "No queued, running, unreviewed warning/failed runs, or retention warnings.") self.assertNotContains(response, "failed 1") self.assertNotContains(response, "warning 1") def test_dashboard_links_latest_snapshot_for_each_host(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP") latest_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Latest Snapshot") self.assertContains(response, latest_snapshot.dirname) self.assertContains(response, reverse("snapshot_detail", args=[latest_snapshot.id])) self.assertNotContains(response, reverse("snapshot_detail", args=[old_snapshot.id])) def test_dashboard_prompts_for_global_config_when_database_is_empty(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "No default global config exists yet.") self.assertContains(response, reverse("edit_global_config")) self.assertContains(response, "Create global config") def test_dashboard_prompts_for_first_host_after_global_config_exists(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Global config is ready.") self.assertContains(response, reverse("create_host_config")) self.assertContains(response, "Add first host") def test_self_check_renders_runtime_checks(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("self_check")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Self Check") self.assertContains(response, "Runtime, filesystem, service, and configuration checks") self.assertContains(response, "Django debug") self.assertContains(response, "Database connection") self.assertContains(response, "State root") def test_logs_view_renders_filtered_journal_messages(self) -> None: self.client.force_login(self.staff_user) completed = subprocess.CompletedProcess( args=["journalctl"], returncode=0, stdout=( "2026-05-19 pobsync-worker.service web-01 failed backup run 12\n" "2026-05-19 pobsync-worker.service web-02 failed backup run 12\n" "2026-05-19 pobsync-web.service web-01 started run 12\n" ), stderr="", ) with patch("pobsync_backend.views.shutil.which", return_value="/usr/bin/journalctl"), patch( "pobsync_backend.views.subprocess.run", return_value=completed ) as run: response = self.client.get( reverse("logs"), { "unit": "pobsync-worker.service", "priority": "0..3", "window": "6h", "host": "web-01", "run": "12", "q": "failed", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Logs") self.assertContains(response, "Filter pobsync service logs") self.assertContains(response, "Filter logs") self.assertContains(response, reverse("logs")) self.assertContains(response, "Clear") self.assertContains(response, "web-01 failed backup run 12") self.assertNotContains(response, "web-02 failed backup run 12") self.assertNotContains(response, "started") command = run.call_args.args[0] self.assertIn("-u", command) self.assertIn("pobsync-worker.service", command) self.assertIn("-p", command) self.assertIn("0..3", command) self.assertIn("--since", command) self.assertIn("6 hours ago", command) def test_purged_snapshots_view_renders_history(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") PurgedSnapshot.objects.create( host=host, host_name=host.host, kind=SnapshotRecord.Kind.SCHEDULED, dirname="20260518-021500Z__OLDSNAP", path=f"/backups/{host.host}/scheduled/20260518-021500Z__OLDSNAP", reason="outside retention policy", action=PurgedSnapshot.Action.SCHEDULED, triggered_by="pobsync-scheduler", ) response = self.client.get(reverse("purged_snapshots")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Purged Snapshots") self.assertContains(response, "Audit trail for snapshots removed") self.assertContains(response, "Apply filters") self.assertContains(response, reverse("purged_snapshots")) self.assertContains(response, "Clear") self.assertContains(response, "20260518-021500Z__OLDSNAP") self.assertContains(response, "outside retention policy") self.assertContains(response, "Scheduled") self.assertContains(response, "pobsync-scheduler") def test_purged_snapshots_view_filters_by_host_and_action(self) -> None: self.client.force_login(self.staff_user) web = HostConfig.objects.create(host="web-01", address="web-01.example.test") db = HostConfig.objects.create(host="db-01", address="db-01.example.test") PurgedSnapshot.objects.create( host=web, host_name=web.host, kind=SnapshotRecord.Kind.SCHEDULED, dirname="20260518-021500Z__WEBOLD", path=f"/backups/{web.host}/scheduled/20260518-021500Z__WEBOLD", reason="outside retention policy", action=PurgedSnapshot.Action.MANUAL, ) PurgedSnapshot.objects.create( host=db, host_name=db.host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260518-021500Z__DBBROKEN", path=f"/backups/{db.host}/.incomplete/20260518-021500Z__DBBROKEN", reason="manual incomplete cleanup", action=PurgedSnapshot.Action.INCOMPLETE_CLEANUP, ) response = self.client.get( reverse("purged_snapshots"), {"host": db.host, "action": PurgedSnapshot.Action.INCOMPLETE_CLEANUP}, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "20260518-021500Z__DBBROKEN") self.assertNotContains(response, "20260518-021500Z__WEBOLD") def test_ssh_credentials_view_creates_key(self) -> None: self.client.force_login(self.staff_user) with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): response = self.client.post( reverse("create_ssh_credential"), { "name": "backup-key", "private_key": "PRIVATE KEY", "public_key": "", "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST", "notes": "production backup key", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "Manage the key pairs pobsync uses") self.assertContains(response, "SSH credential saved for backup-key.") self.assertContains(response, "backup-key") credential = SshCredential.objects.get(name="backup-key") self.assertEqual(credential.private_key, "PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") def test_ssh_credentials_view_creates_key_from_uploaded_file(self) -> None: self.client.force_login(self.staff_user) uploaded_key = SimpleUploadedFile("id_ed25519", b"UPLOADED PRIVATE KEY\n", content_type="text/plain") with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): response = self.client.post( reverse("create_ssh_credential"), { "name": "backup-key", "private_key_file": uploaded_key, "private_key": "", "public_key": "", "known_hosts": "", "notes": "uploaded", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) credential = SshCredential.objects.get(name="backup-key") self.assertEqual(credential.private_key, "UPLOADED PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") def test_ssh_credential_forms_render_cancel_actions(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key") create_response = self.client.get(reverse("create_ssh_credential")) edit_response = self.client.get(reverse("edit_ssh_credential", args=[credential.id])) generate_response = self.client.get(reverse("generate_ssh_credential")) for response in (create_response, edit_response, generate_response): self.assertEqual(response.status_code, 200) self.assertContains(response, "Cancel") self.assertContains(response, reverse("ssh_credentials")) self.assertContains(edit_response, "Delete SSH key") self.assertContains(edit_response, 'class="danger"', html=False) def test_ssh_credentials_view_generates_filesystem_key(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): response = self.client.post( reverse("generate_ssh_credential"), { "name": "generated-key", "key_type": "ed25519", "set_global_default": "", "known_hosts": "", "notes": "generated", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH key generated for generated-key.") credential = SshCredential.objects.get(name="generated-key") self.assertTrue(credential.generated) self.assertEqual(credential.private_key, "") self.assertTrue(credential.public_key.startswith("ssh-ed25519 ")) self.assertTrue(Path(credential.key_path).exists()) def test_ssh_credentials_view_deletes_unused_generated_key(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): credential = SshCredential.objects.create(name="generated-key") from pobsync_backend.ssh_keys import generate_ssh_key generate_ssh_key(credential) key_path = Path(credential.key_path) response = self.client.post( reverse("delete_ssh_credential", args=[credential.id]), {"confirm_name": credential.name}, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH key deleted: generated-key.") self.assertFalse(SshCredential.objects.exists()) self.assertFalse(key_path.exists()) def test_ssh_credentials_view_requires_delete_confirmation(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key") response = self.client.post( reverse("delete_ssh_credential", args=[credential.id]), {"confirm_name": "wrong"}, follow=True, ) self.assertRedirects(response, reverse("edit_ssh_credential", args=[credential.id])) self.assertContains(response, "Type backup-key to confirm SSH key deletion.") self.assertTrue(SshCredential.objects.filter(pk=credential.pk).exists()) def test_ssh_credentials_view_blocks_delete_when_key_is_in_use(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key") HostConfig.objects.create(host="web-01", address="web-01.example.test", ssh_credential=credential) response = self.client.post( reverse("delete_ssh_credential", args=[credential.id]), {"confirm_name": credential.name}, follow=True, ) self.assertRedirects(response, reverse("edit_ssh_credential", args=[credential.id])) self.assertContains(response, "SSH key backup-key is still in use and cannot be deleted.") self.assertTrue(SshCredential.objects.filter(pk=credential.pk).exists()) def test_ssh_credentials_view_rejects_invalid_key(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "not a private key", "public_key": "", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Invalid SSH private key") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "ssh-ed25519 AAAATEST root@backup", "public_key": "", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "This looks like a public key") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None: self.client.force_login(self.staff_user) with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"): response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "PRIVATE KEY", "public_key": "ssh-ed25519 AAAAOTHER root@backup", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Public key does not match") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_updates_existing_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY") with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="UPDATED PUBLIC KEY"): response = self.client.post( reverse("edit_ssh_credential", args=[credential.id]), { "name": "renamed-backup-key", "private_key": "UPDATED KEY", "public_key": "", "known_hosts": "", "notes": "rotated", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) credential.refresh_from_db() self.assertEqual(credential.name, "renamed-backup-key") self.assertEqual(credential.private_key, "UPDATED KEY\n") self.assertEqual(credential.public_key, "UPDATED PUBLIC KEY") self.assertEqual(credential.notes, "rotated") def test_global_config_form_creates_default_config(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="PRIVATE KEY") response = self.client.post( reverse("edit_global_config"), { "name": "default", "default_ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "ssh_options": "StrictHostKeyChecking=no\nBatchMode=yes", "rsync_binary": "rsync", "rsync_args": "-a\n--numeric-ids", "rsync_extra_args": "--delete", "rsync_timeout_seconds": "60", "rsync_bwlimit_kbps": "1000", "default_source_root": "/srv", "default_destination_subdir": "rootfs", "excludes_default": "*.tmp\ncache/", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("dashboard")) self.assertContains(response, "Global config saved for default.") config = GlobalConfig.objects.get(name="default") self.assertEqual(config.backup_root, "/backups") self.assertEqual(config.default_ssh_credential, credential) self.assertEqual(config.ssh_user, "backup") self.assertEqual(config.ssh_port, 2222) self.assertEqual(config.ssh_options, ["StrictHostKeyChecking=no", "BatchMode=yes"]) self.assertEqual(config.rsync_args, ["-a", "--numeric-ids"]) self.assertEqual(config.rsync_extra_args, ["--delete"]) self.assertEqual(config.excludes_default, ["*.tmp", "cache/"]) self.assertEqual(config.retention_daily, 7) self.assertEqual(config.retention_yearly, 1) def test_global_config_form_defaults_to_first_generated_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Defaults used by hosts unless a host overrides them") self.assertContains(response, f'value="{credential.id}" selected') self.assertContains(response, "--archive") self.assertContains(response, "/proc/***") self.assertContains(response, "Cancel") self.assertContains(response, reverse("dashboard")) def test_global_config_form_renders_static_container_backup_root_on_edit(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/mnt/pobsync/backups", ) response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup root:") self.assertContains(response, "/backups") self.assertContains(response, "Config Check") self.assertContains(response, "Runtime backup root") self.assertContains(response, "Runtime state root") self.assertNotContains(response, "/opt/pobsync/backups") self.assertNotContains(response, "Pobsync home") self.assertNotContains(response, "Global pobsync home") def test_global_config_form_renders_config_check_for_non_recursive_rsync(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"]) response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Global rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_global_config_form_resets_backup_root_to_static_container_path(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/mnt/pobsync/backups", ) response = self.client.post( reverse("edit_global_config"), { "name": "default", "ssh_user": "root", "ssh_port": "22", "ssh_options": "", "rsync_binary": "rsync", "rsync_args": "", "rsync_extra_args": "", "rsync_timeout_seconds": "0", "rsync_bwlimit_kbps": "0", "default_source_root": "/", "default_destination_subdir": "", "excludes_default": "", "retention_daily": "14", "retention_weekly": "8", "retention_monthly": "12", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("dashboard")) config = GlobalConfig.objects.get(name="default") self.assertEqual(config.backup_root, "/backups") def test_create_host_config_form_creates_host(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="host-key", private_key="PRIVATE KEY") response = self.client.post( reverse("create_host_config"), { "host": "web-01", "address": "web-01.example.test", "enabled": "on", "ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "/srv/www\n/srv/db", "excludes_add": "*.tmp", "excludes_replace": "", "rsync_extra_args": "--numeric-ids", "rsync_bwlimit_kbps": "4096", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=["web-01"])) self.assertContains(response, "Host config created for web-01.") host = HostConfig.objects.get(host="web-01") self.assertEqual(host.address, "web-01.example.test") self.assertEqual(host.ssh_credential, credential) self.assertEqual(host.ssh_user, "backup") self.assertEqual(host.includes, ["/srv/www", "/srv/db"]) self.assertEqual(host.excludes_add, ["*.tmp"]) self.assertEqual(host.rsync_extra_args, ["--numeric-ids"]) self.assertEqual(host.rsync_bwlimit_kbps, 4096) self.assertEqual(host.retention_weekly, 4) def test_create_host_config_uses_global_defaults_and_prepares_directories(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="global-key", private_key="PRIVATE KEY") with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create( name="default", backup_root=str(backup_root), default_ssh_credential=credential, ssh_user="backup", ssh_port=2222, default_source_root="/srv", retention_daily=3, retention_weekly=2, retention_monthly=1, retention_yearly=0, ) get_response = self.client.get(reverse("create_host_config")) self.assertContains(get_response, 'value="backup"') self.assertContains(get_response, 'value="2222"') self.assertContains(get_response, 'value="/srv"') response = self.client.post( reverse("create_host_config"), { "host": "web-01", "address": "web-01.example.test", "enabled": "on", "ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "", "excludes_add": "", "excludes_replace": "", "rsync_extra_args": "", "retention_daily": "3", "retention_weekly": "2", "retention_monthly": "1", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=["web-01"])) self.assertContains(response, "prepared") self.assertTrue((backup_root / "web-01" / "scheduled").is_dir()) self.assertTrue((backup_root / "web-01" / "manual").is_dir()) self.assertTrue((backup_root / "web-01" / ".incomplete").is_dir()) def test_host_detail_renders_config_schedule_runs_and_snapshots(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", source_root="/srv", retention_daily=7, ) for subdir in ("scheduled", "manual", ".incomplete"): (backup_root / host.host / subdir).mkdir(parents=True) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", prune=True, last_status="success") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") self._set_snapshot_storage(snapshot, allocated=100) BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "web-01") self.assertContains(response, "web-01.example.test") self.assertContains(response, "15 2 * * *") self.assertContains(response, "Schedule expression") self.assertContains(response, "Evaluated by the pobsync scheduler service.") self.assertContains(response, "Next run") self.assertContains(response, "UTC") self.assertContains(response, "20260519-021500Z__ABCDEFGH") self.assertContains(response, "Discover snapshots") self.assertContains(response, "Edit schedule") self.assertContains(response, "Edit config") self.assertContains(response, "Run connection preflight") self.assertContains(response, "Backup Control") self.assertContains(response, "Queue dry-run") self.assertContains(response, "Queue backup") self.assertContains(response, "Host Check") self.assertContains(response, reverse("prepare_host_directories", args=[host.host])) self.assertContains(response, "warning") self.assertContains(response, "Snapshot Storage") self.assertContains(response, "Backup Data") self.assertContains(response, "100 bytes", html=True) self.assertContains(response, reverse("queue_manual_backup", args=[host.host])) self.assertContains(response, reverse("run_detail", args=[BackupRun.objects.get().id])) self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id])) self.assertContains(response, f'{reverse("runs_list")}?host={host.host}', html=False) self.assertContains(response, f'{reverse("snapshots_list")}?host={host.host}', html=False) def test_host_detail_renders_effective_config_preview(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/id_ed25519") GlobalConfig.objects.create( name="default", backup_root="/backups", default_ssh_credential=credential, ssh_user="root", ssh_port=2222, ssh_options=["-oBatchMode=yes"], rsync_args=["--archive", "--numeric-ids"], rsync_extra_args=["--delete"], rsync_timeout_seconds=300, rsync_bwlimit_kbps=2048, default_source_root="/", excludes_default=["/proc/***", "/sys/***"], retention_daily=14, retention_weekly=4, retention_monthly=2, retention_yearly=1, ) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv/www/***"], excludes_add=["/srv/www/cache/***"], rsync_extra_args=["--one-file-system"], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host") self.assertContains(response, "web-01.example.test") self.assertContains(response, "Effective Config") self.assertContains(response, "Backup source:") self.assertNotContains(response, "Source root:") self.assertContains(response, "root@web-01.example.test:2222") self.assertContains(response, "default-key") self.assertContains(response, "-oBatchMode=yes") self.assertContains(response, "--archive --numeric-ids --delete --one-file-system") self.assertContains(response, "2048 KB/s") self.assertContains(response, "/srv/www/***") self.assertContains(response, "/srv/www/cache/***") self.assertContains(response, "d14") self.assertContains(response, "w8") def test_run_host_preflight_stores_remote_check_result(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with patch( "pobsync_backend.preflight.subprocess.run", return_value=subprocess.CompletedProcess(args=["ssh"], returncode=0, stdout="", stderr=""), ) as run: response = self.client.post(reverse("run_host_preflight", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Connection preflight passed for web-01.") self.assertContains(response, "Connection Preflight") self.assertContains(response, "SSH reachability") self.assertContains(response, "Remote rsync") self.assertContains(response, "Remote source root") self.assertEqual(run.call_count, 3) commands = [call.kwargs["args"] if "args" in call.kwargs else call.args[0] for call in run.call_args_list] self.assertEqual(commands[1][-1], "sh -lc 'command -v rsync >/dev/null'") self.assertEqual(commands[2][-1], "sh -lc 'test -e / && test -r /'") self.assertNotIn("sh", commands[2][commands[2].index("root@web-01.example.test") + 1 : -1]) host.refresh_from_db() self.assertTrue(host.config["last_preflight"]["ok"]) self.assertEqual(host.config["last_preflight"]["target"], "root@web-01.example.test") def test_queue_manual_backup_blocks_real_backup_after_failed_remote_preflight(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", config={ "last_preflight": { "ok": False, "target": "root@web-01.example.test", "source_root": "/", "rsync_binary": "rsync", "checks": [ { "name": "Remote rsync", "ok": False, "exit_code": 127, "message": "Remote rsync failed.", "detail": "rsync missing", } ], } }, ) for subdir in ("scheduled", "manual", ".incomplete"): (backup_root / host.host / subdir).mkdir(parents=True) response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue real backup until failed preflight checks are resolved") self.assertContains(response, "Remote preflight") self.assertFalse(BackupRun.objects.exists()) def test_host_detail_renders_backup_trends(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 45, "rsync": { "files_total": 250, "literal_data_bytes": 2048, "matched_data_bytes": 8192, }, }, }, ) BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 18, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 35, "rsync": { "files_total": 150, "literal_data_bytes": 1024, "matched_data_bytes": 4096, }, }, }, ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Trends") self.assertContains(response, "Avg New Data") self.assertContains(response, "Avg Daily New") self.assertContains(response, "manual") self.assertContains(response, "45s") self.assertContains(response, "250") self.assertContains(response, "2.0") self.assertContains(response, "KB") self.assertContains(response, "Run data trend") self.assertContains(response, "width: 100%") self.assertContains(response, "width: 50%") def test_prepare_host_directories_action_creates_missing_directories(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("prepare_host_directories", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Prepared backup directories") self.assertTrue((backup_root / host.host / "scheduled").is_dir()) self.assertTrue((backup_root / host.host / "manual").is_dir()) self.assertTrue((backup_root / host.host / ".incomplete").is_dir()) def test_host_detail_warns_when_rsync_is_not_recursive(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=[]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host effective rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_host_detail_warns_when_replace_excludes_drops_root_defaults(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/backups", rsync_args=["--archive"], excludes_default=["/proc/***", "/sys/***", "/dev/***", "/run/***", "/tmp/***"], ) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", excludes_replace=[], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective root excludes") self.assertContains(response, "excludes_replace is active") def test_host_detail_warns_that_includes_are_raw_rsync_rules(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv/www"], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host includes") self.assertContains(response, "Includes are passed to rsync as raw --include rules.") def test_scan_host_known_key_action_updates_selected_credential(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") GlobalConfig.objects.create(name="default", backup_root="/backups", default_ssh_credential=credential, ssh_port=2222) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with patch( "pobsync_backend.views.scan_known_host", return_value="web-01.example.test ssh-ed25519 AAAASCANNED", ) as scan: response = self.client.post(reverse("scan_host_known_key", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Stored SSH host key for web-01") scan.assert_called_once_with("web-01.example.test", port=2222) credential.refresh_from_db() self.assertEqual(credential.known_hosts, "web-01.example.test ssh-ed25519 AAAASCANNED\n") def test_host_detail_surfaces_active_backup_run(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "queued") self.assertContains(response, f"Run {run.id}") self.assertContains(response, reverse("run_detail", args=[run.id])) def test_host_detail_disables_backup_controls_without_global_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "missing global config") self.assertContains(response, "Create the default global config before queueing backups.") def test_host_detail_renders_discovery_status_for_existing_snapshot_dirs(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") (backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH").mkdir(parents=True) (backup_root / host.host / ".incomplete" / "20260519-031500Z__BROKEN01").mkdir(parents=True) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host root") self.assertContains(response, str(backup_root / host.host)) self.assertContains(response, "Found 2 snapshot directories") self.assertContains(response, "scheduled 1") self.assertContains(response, "incomplete 1") def test_host_detail_returns_404_for_unknown_host(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("host_detail", args=["missing-host"])) self.assertEqual(response.status_code, 404) def test_queue_manual_backup_creates_queued_run_and_redirects_to_run_detail(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), { "dry_run": "on", "verbose_output": "on", "prune": "on", "prune_max_delete": "4", "prune_protect_bases": "on", }, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, f"Queued manual backup run {run.id} for web-01.") self.assertEqual(run.status, BackupRun.Status.QUEUED) self.assertEqual(run.run_type, BackupRun.RunType.MANUAL) self.assertEqual( run.result["requested"], { "dry_run": True, "verbose_output": True, "prune": True, "prune_max_delete": 4, "prune_protect_bases": True, }, ) def test_queue_manual_backup_quick_action_can_queue_real_backup(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") for subdir in ("scheduled", "manual", ".incomplete"): (backup_root / host.host / subdir).mkdir(parents=True) response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertEqual( run.result["requested"], { "dry_run": False, "verbose_output": False, "prune": False, "prune_max_delete": 10, "prune_protect_bases": False, }, ) def test_queue_manual_backup_blocks_real_backup_when_host_directories_are_missing(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue real backup until failed preflight checks are resolved") self.assertContains(response, "Host backup root") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_allows_dry_run_with_only_storage_preflight_failures(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on", "verbose_output": "on", "prune_max_delete": "10"}, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertEqual(run.result["requested"]["dry_run"], True) def test_queue_manual_backup_requires_default_global_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Create the default global config before queueing backups.") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_rejects_disabled_host(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test", enabled=False) response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue backup for disabled host web-01.") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("queue_manual_backup", args=[host.host])) self.assertEqual(response.status_code, 405) def test_run_detail_renders_result_payload(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, snapshot_path=snapshot.path, base_path="/backups/web-01/scheduled/base", rsync_exit_code=0, result={ "ok": True, "snapshot": snapshot.path, "rsync": { "command": ["rsync", "--archive", "--bwlimit=2048", "root@web-01:/", snapshot.path], "exit_code": 0, "log_tail": ["sending incremental file list", "sent 500 bytes"], "bwlimit_kbps": 2048, }, "requested": { "dry_run": True, "verbose_output": True, "prune": False, "prune_max_delete": 10, "prune_protect_bases": False, }, "stats": { "duration_seconds": 12, "rsync": { "files_total": 10, "files_transferred": 2, "total_file_size_bytes": 2000, "total_transferred_file_size_bytes": 500, "literal_data_bytes": 500, "matched_data_bytes": 1500, "link_dest_estimated_savings_bytes": 1500, }, }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Run") self.assertContains(response, "web-01") self.assertContains(response, "success") self.assertContains(response, "ABCDEFGH") self.assertContains(response, "Requested Options") self.assertContains(response, "Dry run: yes") self.assertContains(response, "Verbose rsync output: yes") self.assertContains(response, "Rsync Command") self.assertContains(response, "Bandwidth limit:") self.assertContains(response, "2048 KB/s") self.assertContains(response, "--archive") self.assertContains(response, "Rsync Log") self.assertContains(response, "sending incremental file list") self.assertContains(response, "Run Progress") self.assertContains(response, "dry run") self.assertContains(response, "Files Seen") self.assertContains(response, "Would Transfer") self.assertContains(response, "Transfer Estimate") self.assertContains(response, "Warnings: none recorded") self.assertContains(response, "Stats") self.assertContains(response, "Files seen: 10") self.assertContains(response, "Estimated link-dest saving") self.assertContains(response, ""ok": true") self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id])) def test_run_detail_surfaces_dry_run_warnings_and_log_link(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: log_path = Path(tmp) / "dry-run" / "rsync.log" log_path.parent.mkdir(parents=True) log_path.write_text("WARNING: noisy shell output\npermission denied\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.FAILED, rsync_exit_code=255, result={ "ok": False, "dry_run": True, "log": str(log_path), "failure": { "category": "transport", "message": "Rsync transport failed.", "hint": "Check SSH access.", }, "stats": { "duration_seconds": 4, "rsync": { "files_total": 25, "files_transferred": 3, "total_file_size_bytes": 10_000, "total_transferred_file_size_bytes": 1_500, }, }, "rsync": { "exit_code": 255, "log_tail": ["WARNING: noisy shell output", "permission denied"], }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Run Progress") self.assertContains(response, "dry run") self.assertContains(response, "failed") self.assertContains(response, "Files Seen") self.assertContains(response, "25") self.assertContains(response, "Would Transfer") self.assertContains(response, "3") self.assertContains(response, "1.5") self.assertContains(response, "Open full rsync log") self.assertContains(response, reverse("run_rsync_log", args=[run.id])) self.assertContains(response, "Rsync transport failed.") self.assertContains(response, "Check SSH access.") self.assertContains(response, "WARNING: noisy shell output") def test_run_detail_links_existing_rsync_log(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: log_path = Path(tmp) / "snapshot" / "meta" / "rsync.log" log_path.parent.mkdir(parents=True) log_path.write_text("old line\nrsync log line\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot_path=str(log_path.parent.parent), result={"ok": True, "log": str(log_path)}, ) response = self.client.get(reverse("run_detail", args=[run.id])) log_response = self.client.get(reverse("run_rsync_log", args=[run.id])) log_body = b"".join(log_response.streaming_content) self.assertContains(response, reverse("run_rsync_log", args=[run.id])) self.assertContains(response, str(log_path)) self.assertContains(response, "rsync log line") self.assertEqual(log_response.status_code, 200) self.assertEqual(log_body, b"old line\nrsync log line\n") def test_run_detail_surfaces_failure_and_retention_warning(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create( host=host, status=BackupRun.Status.WARNING, rsync_exit_code=0, result={ "ok": True, "failure": { "category": "transport", "summary": "SSH connection dropped.", "hint": "Check network connectivity.", }, "prune": { "ok": True, "source": "sql", "kind": "scheduled", "planned_delete_count": 1, "max_delete": 1, "protect_bases": True, "incomplete_ignored_count": 1, "deleted": [{"dirname": "20260518-021500Z__OLD"}], "actions": ["deleted scheduled 20260518-021500Z__OLD"], }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup run") self.assertContains(response, "web-01") self.assertContains(response, "Failure") self.assertContains(response, "transport") self.assertContains(response, "Check network connectivity.") self.assertContains(response, "Retention") self.assertContains(response, "Planned deletions") self.assertNotContains(response, "Source: sql") self.assertContains(response, "Max delete") self.assertContains(response, "Protect bases") self.assertContains(response, "Incomplete ignored") self.assertContains(response, "deleted scheduled 20260518-021500Z__OLD") def test_run_review_action_marks_problem_run_reviewed(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.FAILED, result={"ok": False}) response = self.client.post(reverse("resolve_run_review", args=[run.id]), follow=True) run.refresh_from_db() self.assertIsNotNone(run.reviewed_at) self.assertEqual(run.reviewed_by, self.staff_user.username) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, f"Run {run.id} marked reviewed.") self.assertContains(response, "Review") self.assertContains(response, self.staff_user.username) self.assertNotContains(response, "Mark reviewed") def test_run_review_action_ignores_successful_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, result={"ok": True}) response = self.client.post(reverse("resolve_run_review", args=[run.id]), follow=True) run.refresh_from_db() self.assertIsNone(run.reviewed_at) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, f"Run {run.id} does not need review.") def test_run_detail_surfaces_host_retention_warnings(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, result={"ok": True}) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention Warnings") self.assertContains(response, "Scheduled pruning for this host would delete 2 snapshot(s)") self.assertContains(response, "1 incomplete snapshot(s) exist for this host") def test_run_detail_infers_rsync_log_from_snapshot_path(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: snapshot_path = Path(tmp) / "snapshot" log_path = snapshot_path / "meta" / "rsync.log" log_path.parent.mkdir(parents=True) log_path.write_text("scheduled log\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot_path=str(snapshot_path), result={"ok": True}, ) response = self.client.get(reverse("run_rsync_log", args=[run.id])) body = b"".join(response.streaming_content) self.assertEqual(response.status_code, 200) self.assertEqual(body, b"scheduled log\n") def test_run_detail_offers_cancel_for_running_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Run Control") self.assertContains(response, "Cancelling a queued run stops it immediately") self.assertContains(response, "Cancel run") self.assertContains(response, reverse("cancel_run", args=[run.id])) self.assertContains(response, 'class="danger"', html=False) def test_run_detail_enables_live_refresh_for_active_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, f'data-refresh-url="{reverse("run_detail_live", args=[run.id])}"', html=False) self.assertContains(response, 'data-refresh-interval="5000"', html=False) self.assertContains(response, 'data-refresh-active="true"', html=False) self.assertContains(response, "Live Updates") self.assertContains(response, "Pause refresh") def test_run_detail_live_returns_partial_for_active_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create( host=host, status=BackupRun.Status.RUNNING, result={"rsync": {"log_tail": ["sending incremental file list"]}}, ) response = self.client.get(reverse("run_detail_live", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertEqual(response["X-Pobsync-Refresh-Active"], "true") self.assertContains(response, "Run Control") self.assertContains(response, "sending incremental file list") self.assertNotContains(response, " None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: snapshot_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260523-010000Z__ABCDEFGH" data_path = snapshot_path / "data" log_path = snapshot_path / "meta" / "rsync.log" data_path.mkdir(parents=True) log_path.parent.mkdir(parents=True) (data_path / "payload.txt").write_text("payload", encoding="utf-8") log_path.write_text("sending incremental file list\npayload.txt\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.RUNNING, snapshot_path=str(snapshot_path), result={ "requested": {"dry_run": False}, "execution": { "phase": "rsync", "snapshot": str(snapshot_path), "log": str(log_path), "heartbeat_at": "2026-05-23T01:00:00+02:00", }, "rsync": {"pid": 1234, "pgid": 1234, "command": ["rsync"]}, }, ) response = self.client.get(reverse("run_detail_live", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Run Progress") self.assertContains(response, "backup") self.assertContains(response, "rsync") self.assertContains(response, "1234") self.assertContains(response, "Data Files") self.assertContains(response, "Open full rsync log") self.assertContains(response, "payload.txt") self.assertContains(response, "sending incremental file list") def test_run_detail_live_stops_refresh_for_terminal_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS) response = self.client.get(reverse("run_detail_live", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertEqual(response["X-Pobsync-Refresh-Active"], "false") self.assertNotContains(response, "Run Control") def test_run_detail_renders_worker_execution_metadata(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create( host=host, status=BackupRun.Status.RUNNING, result={ "execution": { "worker_host": "backup-01", "worker_pid": 4242, "heartbeat_at": "2026-05-21T10:30:00+00:00", } }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Worker:") self.assertContains(response, "backup-01") self.assertContains(response, "pid 4242") self.assertContains(response, "Worker heartbeat:") def test_cancel_run_marks_queued_run_cancelled(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, "Cancellation requested") run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertIsNotNone(run.ended_at) self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.QUEUED) def test_cancel_run_requests_running_run_cancellation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True) self.assertRedirects(response, reverse("run_detail", args=[run.id])) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertIsNone(run.ended_at) self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.RUNNING) def test_snapshot_detail_renders_metadata_runs_and_children(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") base = self._snapshot(host, "20260518-021500Z__BASESNAP") base.metadata = { "status": "success", "snapshot_id": "BASESNAP", "stats": { "duration_seconds": 20, "rsync": { "files_total": 100, "files_transferred": 4, "total_file_size_bytes": 10_000, "link_dest_estimated_savings_bytes": 7_000, }, "storage": { "snapshot": { "apparent_size_bytes": 10_000, "allocated_size_bytes": 3_000, "hardlinked_files": 9, }, "capacity": { "used_percent": 30.5, "available_bytes": 1_000_000, }, }, }, } base.save(update_fields=["metadata"]) child = self._snapshot(host, "20260519-021500Z__CHILDSNP") child.base = base child.base_dirname = base.dirname child.save(update_fields=["base", "base_dirname"]) run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=base) response = self.client.get(reverse("snapshot_detail", args=[base.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Snapshot") self.assertContains(response, base.dirname) self.assertContains(response, "BASESNAP") self.assertContains(response, "Stats") self.assertContains(response, "Files seen: 100") self.assertContains(response, "Hardlinked files: 9") self.assertContains(response, "Restore Guidance") self.assertContains(response, "Snapshot data path:") self.assertNotContains(response, "Snapshot data source:") self.assertContains(response, "Dry-run restore back to the original host:") self.assertNotContains(response, "Dry-run restore back to the source host:") self.assertContains(response, f"{base.path}/data") self.assertContains(response, f"/restore/{host.host}") self.assertContains(response, "rsync -aHAX --numeric-ids --info=progress2 --dry-run") self.assertContains(response, f"{base.path}/data/") self.assertContains(response, "root@web-01.example.test:/") self.assertContains(response, "Dry-run a directory restore") self.assertContains(response, f"{base.path}/data/etc/nginx/") self.assertContains(response, f"/restore/{host.host}/etc/nginx/") self.assertContains(response, "Dry-run a single file restore") self.assertContains(response, f"{base.path}/data/home/example/site/public_html/index.php") self.assertContains(response, f"/restore/{host.host}/home/example/site/public_html/index.php") self.assertContains(response, "Treat snapshot directories as read-only") self.assertContains(response, child.dirname) self.assertContains(response, f"Run {run.id}") self.assertContains(response, reverse("run_detail", args=[run.id])) self.assertContains(response, reverse("snapshot_detail", args=[child.id])) def test_discover_host_snapshots_action_discovers_and_redirects(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" meta_dir = snapshot_dir / "meta" meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Snapshot discovery scanned 1 items") self.assertTrue(SnapshotRecord.objects.filter(host=host, dirname=snapshot_dir.name).exists()) def test_discover_host_snapshots_warns_when_host_root_is_missing(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Snapshot discovery scanned 0 items") self.assertContains(response, "Host backup directory does not exist yet") self.assertFalse(SnapshotRecord.objects.exists()) def test_discover_host_snapshots_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("discover_host_snapshots", args=[host.host])) self.assertEqual(response.status_code, 405) def test_retention_plan_renders_keep_and_delete_sets(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP") new_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention") self.assertContains(response, "Preview which snapshots stay") self.assertContains(response, "web-01") self.assertContains(response, old_snapshot.dirname) self.assertContains(response, new_snapshot.dirname) self.assertContains(response, "newest") self.assertContains(response, "Would Delete") self.assertContains(response, "outside retention policy") self.assertNotContains(response, "
Source
", html=True) self.assertContains(response, "Confirm delete count") self.assertContains(response, "Type 1 to confirm the current number of planned deletions.") self.assertContains(response, "This permanently deletes the snapshot directories listed in Would Delete.") self.assertContains(response, 'class="danger"', html=False) self.assertContains(response, "Cancel") def test_retention_plan_warns_when_scheduled_prune_limit_is_exceeded(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Scheduled Prune Limit") self.assertContains(response, "would delete 2 snapshot(s)") self.assertContains(response, "scheduled prune limit of") self.assertContains(response, "Schedule max delete: 1") def test_retention_plan_can_enable_base_protection(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) base = self._snapshot(host, "20260518-021500Z__BASESNAP") child = self._snapshot(host, "20260519-021500Z__CHILDSNP") child.base = base child.save(update_fields=["base"]) response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"protect_bases": "1"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Protect bases: yes") self.assertContains(response, "Base snapshots referenced by kept snapshots") self.assertContains(response, f"base-of:{child.dirname}") def test_retention_plan_surfaces_incomplete_snapshots_without_deleting_them(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) self._snapshot(host, "20260518-021500Z__OLDSNAP") self._snapshot(host, "20260519-021500Z__NEWSNAP") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Incomplete Snapshots") self.assertContains(response, "20260519-031500Z__BROKEN01") self.assertContains(response, "excluded from retention cleanup") self.assertContains(response, "needs review") self.assertContains(response, "Cleanup is blocked until all incomplete snapshots are reviewed.") self.assertContains(response, "Mark incomplete snapshots reviewed") self.assertContains(response, "delete only incomplete snapshot directories") self.assertNotContains(response, "Delete incomplete snapshots") def test_retention_plan_offers_incomplete_cleanup_after_review(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), reviewed_at=datetime(2026, 5, 19, 4, 15, tzinfo=timezone.utc), reviewed_by="admin", ) response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "reviewed") self.assertContains(response, "Delete incomplete snapshots") self.assertContains(response, "Type 1 to confirm the current number of incomplete snapshots.") self.assertContains(response, "This deletes only reviewed incomplete snapshot directories") self.assertContains(response, 'class="danger"', html=False) def test_incomplete_cleanup_deletes_incomplete_snapshot_after_confirmation(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: home = Path(tmp) / "home" host = HostConfig.objects.create(host="web-01", address="web-01.example.test") incomplete_dir = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-031500Z__BROKEN01" incomplete_dir.mkdir(parents=True) incomplete_dir.joinpath("partial-file").write_text("interrupted\n") record = SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname=incomplete_dir.name, path=str(incomplete_dir), status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), reviewed_at=datetime(2026, 5, 19, 4, 15, tzinfo=timezone.utc), reviewed_by="admin", ) with override_settings(POBSYNC_HOME=str(home)): response = self.client.post( reverse("cleanup_host_incomplete_snapshots", args=[host.host]), { "max_delete": "1", "confirm_host": host.host, "confirm_delete_count": "1", }, follow=True, ) self.assertFalse(incomplete_dir.exists()) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "Deleted 1 incomplete snapshot(s) for web-01.") self.assertFalse(SnapshotRecord.objects.filter(pk=record.pk).exists()) def test_incomplete_cleanup_rejects_bad_confirmation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) response = self.client.post( reverse("cleanup_host_incomplete_snapshots", args=[host.host]), { "max_delete": "1", "confirm_host": "wrong", "confirm_delete_count": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "Incomplete cleanup confirmation is invalid.") self.assertEqual(SnapshotRecord.objects.filter(kind=SnapshotRecord.Kind.INCOMPLETE).count(), 1) def test_incomplete_cleanup_rejects_unreviewed_snapshots(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): response = self.client.post( reverse("cleanup_host_incomplete_snapshots", args=[host.host]), { "max_delete": "0", "confirm_host": host.host, "confirm_delete_count": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "have not been reviewed") self.assertEqual(SnapshotRecord.objects.filter(kind=SnapshotRecord.Kind.INCOMPLETE).count(), 1) def test_host_detail_surfaces_retention_warnings(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention Warnings") self.assertContains(response, "Scheduled pruning would delete 2 snapshot(s), above max delete") def test_host_detail_can_mark_incomplete_snapshots_reviewed(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") incomplete = SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) response = self.client.post(reverse("resolve_host_incomplete_reviews", args=[host.host]), follow=True) incomplete.refresh_from_db() self.assertIsNotNone(incomplete.reviewed_at) self.assertEqual(incomplete.reviewed_by, self.staff_user.username) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Marked 1 incomplete snapshot(s) reviewed for web-01.") self.assertNotContains(response, "Retention Warnings") def test_host_detail_does_not_warn_for_reviewed_incomplete_snapshots(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), reviewed_at=datetime(2026, 5, 19, 4, 15, tzinfo=timezone.utc), reviewed_by="admin", ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertNotContains(response, "Retention Warnings") def test_retention_plan_rejects_invalid_kind(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"kind": "weird"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Retention kind must be scheduled, manual, or all.") def test_retention_apply_deletes_planned_snapshot_after_confirmation(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: home = Path(tmp) / "home" host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) old_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260518-021500Z__OLDSNAP" new_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260519-021500Z__NEWSNAP" old_dir.mkdir(parents=True) new_dir.mkdir(parents=True) old_snapshot = self._snapshot(host, old_dir.name) old_snapshot.path = str(old_dir) old_snapshot.save(update_fields=["path"]) new_snapshot = self._snapshot(host, new_dir.name) new_snapshot.path = str(new_dir) new_snapshot.save(update_fields=["path"]) with override_settings(POBSYNC_HOME=str(home)): response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": host.host, "confirm_delete_count": "1", }, follow=True, ) self.assertFalse(old_dir.exists()) self.assertTrue(new_dir.exists()) self.assertRedirects(response, f"{reverse('host_retention_plan', args=[host.host])}?kind=scheduled") self.assertContains(response, "Retention deleted 1 snapshot(s) for web-01.") self.assertFalse(SnapshotRecord.objects.filter(pk=old_snapshot.pk).exists()) self.assertTrue(SnapshotRecord.objects.filter(pk=new_snapshot.pk).exists()) purged = PurgedSnapshot.objects.get(dirname=old_snapshot.dirname) self.assertEqual(purged.action, PurgedSnapshot.Action.MANUAL) self.assertEqual(purged.triggered_by, self.staff_user.username) self.assertEqual(purged.reason, "outside retention policy") def test_retention_apply_rejects_bad_confirmation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") self._snapshot(host, "20260518-021500Z__OLDSNAP") response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": "wrong", "confirm_delete_count": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "Retention apply confirmation is invalid.") self.assertEqual(SnapshotRecord.objects.count(), 1) def test_retention_apply_rejects_mismatched_delete_count_confirmation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) self._snapshot(host, "20260518-021500Z__OLDSNAP") self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": host.host, "confirm_delete_count": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "Retention apply confirmation is invalid.") self.assertEqual(SnapshotRecord.objects.count(), 2) def test_retention_apply_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("apply_host_retention", args=[host.host])) self.assertEqual(response.status_code, 405) def test_schedule_form_renders_defaults_for_new_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("edit_host_schedule", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Automatic backup timing and scheduled prune behavior") self.assertContains(response, "Create Schedule") self.assertContains(response, "Schedule expression") self.assertContains(response, "evaluated by the pobsync scheduler service") self.assertContains(response, "15 2 * * *") self.assertContains(response, "Save schedule") self.assertContains(response, "Cancel") self.assertContains(response, reverse("host_detail", args=[host.host])) def test_schedule_form_creates_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "30 3 * * *", "enabled": "on", "prune": "on", "prune_max_delete": "4", "prune_protect_bases": "on", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Schedule saved for web-01.") schedule = ScheduleConfig.objects.get(host=host) self.assertEqual(schedule.cron_expr, "30 3 * * *") self.assertTrue(schedule.enabled) self.assertTrue(schedule.prune) self.assertEqual(schedule.prune_max_delete, 4) self.assertTrue(schedule.prune_protect_bases) def test_schedule_form_updates_existing_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") schedule = ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True) response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "45 4 * * 1", "prune_max_delete": "8", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) schedule.refresh_from_db() self.assertEqual(schedule.cron_expr, "45 4 * * 1") self.assertFalse(schedule.enabled) self.assertFalse(schedule.prune) self.assertEqual(schedule.prune_max_delete, 8) def test_schedule_form_renders_existing_schedule_values(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") ScheduleConfig.objects.create( host=host, cron_expr="45 4 * * 1", enabled=True, prune_max_delete=8, ) response = self.client.get(reverse("edit_host_schedule", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Edit Schedule") self.assertContains(response, 'value="45 4 * * 1"', html=False) self.assertContains(response, 'value="8"', html=False) self.assertNotContains(response, 'value="15 2 * * *"', html=False) self.assertNotContains(response, ">User<", html=False) def test_schedule_form_rejects_invalid_cron(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "bad cron", "enabled": "on", "prune_max_delete": "10", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "cron expression must have exactly 5 fields") self.assertFalse(ScheduleConfig.objects.filter(host=host).exists()) def test_host_config_form_renders_existing_values(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv"], excludes_add=["*.tmp"], rsync_extra_args=["--numeric-ids"], ) response = self.client.get(reverse("edit_host_config", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Config: web-01") self.assertContains(response, "web-01.example.test") self.assertContains(response, "/srv") self.assertContains(response, "*.tmp") self.assertContains(response, "--numeric-ids") self.assertContains(response, "Cancel") self.assertContains(response, reverse("host_detail", args=[host.host])) def test_host_config_form_renders_effective_config_check(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("edit_host_config", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective Config Check") self.assertContains(response, "Host effective rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_host_config_form_updates_host_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="old.example.test") response = self.client.post( reverse("edit_host_config", args=[host.host]), { "address": "new.example.test", "enabled": "on", "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "/srv/www\n/srv/db", "excludes_add": "*.tmp\ncache/", "excludes_replace": "", "rsync_extra_args": "--numeric-ids\n--delete", "rsync_bwlimit_kbps": "8192", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Host config saved for web-01.") host.refresh_from_db() self.assertEqual(host.address, "new.example.test") self.assertEqual(host.ssh_user, "backup") self.assertEqual(host.ssh_port, 2222) self.assertEqual(host.source_root, "/srv") self.assertEqual(host.includes, ["/srv/www", "/srv/db"]) self.assertEqual(host.excludes_add, ["*.tmp", "cache/"]) self.assertIsNone(host.excludes_replace) self.assertEqual(host.rsync_extra_args, ["--numeric-ids", "--delete"]) self.assertEqual(host.rsync_bwlimit_kbps, 8192) self.assertEqual(host.retention_daily, 7) self.assertEqual(host.retention_yearly, 1) def test_host_config_form_can_replace_global_excludes(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_config", args=[host.host]), { "address": host.address, "ssh_user": "", "ssh_port": "", "source_root": "", "includes": "", "excludes_add": "", "excludes_replace": "*.cache\nnode_modules/", "rsync_extra_args": "", "retention_daily": "14", "retention_weekly": "8", "retention_monthly": "12", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) host.refresh_from_db() self.assertFalse(host.enabled) self.assertEqual(host.excludes_add, []) self.assertEqual(host.excludes_replace, ["*.cache", "node_modules/"]) def _snapshot( self, host: HostConfig, dirname: str, *, kind: str = SnapshotRecord.Kind.SCHEDULED, ) -> SnapshotRecord: started_at = datetime.strptime(dirname.split("__", 1)[0], "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc) return SnapshotRecord.objects.create( host=host, kind=kind, dirname=dirname, path=f"/backups/{host.host}/{kind}/{dirname}", status="success", started_at=started_at, ) def _set_snapshot_storage(self, snapshot: SnapshotRecord, *, allocated: int) -> None: snapshot.metadata = { "stats": { "storage": { "snapshot": { "apparent_size_bytes": allocated * 2, "allocated_size_bytes": allocated, } } } } snapshot.save(update_fields=["metadata"])