from __future__ import annotations import subprocess from datetime import datetime, timezone from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch from django.contrib.auth import get_user_model from django.core.files.uploadedfile import SimpleUploadedFile from django.test import TestCase, override_settings from django.urls import reverse from pobsync.util import write_yaml_atomic from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential class ViewTests(TestCase): def setUp(self) -> None: user_model = get_user_model() self.staff_user = user_model.objects.create_user( username="admin", password="secret", is_staff=True, is_superuser=True, ) def test_dashboard_requires_staff_login(self) -> None: response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 302) self.assertIn("/admin/login/", response["Location"]) def test_dashboard_renders_hosts_and_latest_runs(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Dashboard") self.assertContains(response, "web-01") self.assertContains(response, "20260519-021500Z__ABCDEFGH") self.assertContains(response, "success") self.assertContains(response, f"Run {run.id}") self.assertContains(response, "manual") def test_dashboard_renders_backup_trend_summary(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/missing-backup-root") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 30, "rsync": { "files_total": 100, "literal_data_bytes": 1000, "matched_data_bytes": 4000, }, "storage": { "capacity": { "available_bytes": 10_000, "used_percent": 25.0, } }, }, }, ) ScheduleConfig.objects.create(host=host, cron_expr="* * * * *", enabled=True) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Root Used") self.assertContains(response, "Runs Until Full") self.assertContains(response, "Avg Daily New") self.assertContains(response, "Days Until Full") self.assertContains(response, "Next Run") self.assertContains(response, "UTC") self.assertContains(response, "10") self.assertContains(response, f"Run {run.id}") self.assertContains(response, "manual") self.assertContains(response, "1000") def test_dashboard_surfaces_retention_warnings(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Scheduled prune would delete 2 snapshot(s), above max 1.") self.assertContains(response, "1 incomplete snapshot(s) need review.") def test_dashboard_links_latest_snapshot_for_each_host(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP") latest_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Latest Snapshot") self.assertContains(response, latest_snapshot.dirname) self.assertContains(response, reverse("snapshot_detail", args=[latest_snapshot.id])) self.assertNotContains(response, reverse("snapshot_detail", args=[old_snapshot.id])) def test_dashboard_prompts_for_global_config_when_database_is_empty(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "No default global config exists yet.") self.assertContains(response, reverse("edit_global_config")) self.assertContains(response, "Create global config") def test_dashboard_prompts_for_first_host_after_global_config_exists(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Global config is ready.") self.assertContains(response, reverse("create_host_config")) self.assertContains(response, "Add first host") def test_self_check_renders_runtime_checks(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("self_check")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Self Check") self.assertContains(response, "Django debug") self.assertContains(response, "Database connection") self.assertContains(response, "POBSYNC_HOME") def test_logs_view_renders_filtered_journal_messages(self) -> None: self.client.force_login(self.staff_user) completed = subprocess.CompletedProcess( args=["journalctl"], returncode=0, stdout=( "2026-05-19 pobsync-worker.service web-01 failed backup run 12\n" "2026-05-19 pobsync-worker.service web-02 failed backup run 12\n" "2026-05-19 pobsync-web.service web-01 started run 12\n" ), stderr="", ) with patch("pobsync_backend.views.shutil.which", return_value="/usr/bin/journalctl"), patch( "pobsync_backend.views.subprocess.run", return_value=completed ) as run: response = self.client.get( reverse("logs"), { "unit": "pobsync-worker.service", "priority": "0..3", "window": "6h", "host": "web-01", "run": "12", "q": "failed", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Logs") self.assertContains(response, "web-01 failed backup run 12") self.assertNotContains(response, "web-02 failed backup run 12") self.assertNotContains(response, "started") command = run.call_args.args[0] self.assertIn("-u", command) self.assertIn("pobsync-worker.service", command) self.assertIn("-p", command) self.assertIn("0..3", command) self.assertIn("--since", command) self.assertIn("6 hours ago", command) def test_ssh_credentials_view_creates_key(self) -> None: self.client.force_login(self.staff_user) with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): response = self.client.post( reverse("create_ssh_credential"), { "name": "backup-key", "private_key": "PRIVATE KEY", "public_key": "", "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST", "notes": "production backup key", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH credential saved for backup-key.") self.assertContains(response, "backup-key") credential = SshCredential.objects.get(name="backup-key") self.assertEqual(credential.private_key, "PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") def test_ssh_credentials_view_creates_key_from_uploaded_file(self) -> None: self.client.force_login(self.staff_user) uploaded_key = SimpleUploadedFile("id_ed25519", b"UPLOADED PRIVATE KEY\n", content_type="text/plain") with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): response = self.client.post( reverse("create_ssh_credential"), { "name": "backup-key", "private_key_file": uploaded_key, "private_key": "", "public_key": "", "known_hosts": "", "notes": "uploaded", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) credential = SshCredential.objects.get(name="backup-key") self.assertEqual(credential.private_key, "UPLOADED PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") def test_ssh_credentials_view_generates_filesystem_key(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): response = self.client.post( reverse("generate_ssh_credential"), { "name": "generated-key", "key_type": "ed25519", "set_global_default": "", "known_hosts": "", "notes": "generated", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH key generated for generated-key.") credential = SshCredential.objects.get(name="generated-key") self.assertTrue(credential.generated) self.assertEqual(credential.private_key, "") self.assertTrue(credential.public_key.startswith("ssh-ed25519 ")) self.assertTrue(Path(credential.key_path).exists()) def test_ssh_credentials_view_deletes_unused_generated_key(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): credential = SshCredential.objects.create(name="generated-key") from pobsync_backend.ssh_keys import generate_ssh_key generate_ssh_key(credential) key_path = Path(credential.key_path) response = self.client.post(reverse("delete_ssh_credential", args=[credential.id]), follow=True) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH key deleted: generated-key.") self.assertFalse(SshCredential.objects.exists()) self.assertFalse(key_path.exists()) def test_ssh_credentials_view_rejects_invalid_key(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "not a private key", "public_key": "", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Invalid SSH private key") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "ssh-ed25519 AAAATEST root@backup", "public_key": "", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "This looks like a public key") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None: self.client.force_login(self.staff_user) with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"): response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "PRIVATE KEY", "public_key": "ssh-ed25519 AAAAOTHER root@backup", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Public key does not match") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_updates_existing_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY") with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="UPDATED PUBLIC KEY"): response = self.client.post( reverse("edit_ssh_credential", args=[credential.id]), { "name": "backup-key", "private_key": "UPDATED KEY", "public_key": "", "known_hosts": "", "notes": "rotated", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) credential.refresh_from_db() self.assertEqual(credential.private_key, "UPDATED KEY\n") self.assertEqual(credential.public_key, "UPDATED PUBLIC KEY") self.assertEqual(credential.notes, "rotated") def test_global_config_form_creates_default_config(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="PRIVATE KEY") response = self.client.post( reverse("edit_global_config"), { "name": "default", "default_ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "ssh_options": "StrictHostKeyChecking=no\nBatchMode=yes", "rsync_binary": "rsync", "rsync_args": "-a\n--numeric-ids", "rsync_extra_args": "--delete", "rsync_timeout_seconds": "60", "rsync_bwlimit_kbps": "1000", "default_source_root": "/srv", "default_destination_subdir": "rootfs", "excludes_default": "*.tmp\ncache/", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("dashboard")) self.assertContains(response, "Global config saved for default.") config = GlobalConfig.objects.get(name="default") self.assertEqual(config.backup_root, "/backups") self.assertEqual(config.pobsync_home, "/opt/pobsync") self.assertEqual(config.default_ssh_credential, credential) self.assertEqual(config.ssh_user, "backup") self.assertEqual(config.ssh_port, 2222) self.assertEqual(config.ssh_options, ["StrictHostKeyChecking=no", "BatchMode=yes"]) self.assertEqual(config.rsync_args, ["-a", "--numeric-ids"]) self.assertEqual(config.rsync_extra_args, ["--delete"]) self.assertEqual(config.excludes_default, ["*.tmp", "cache/"]) self.assertEqual(config.retention_daily, 7) self.assertEqual(config.retention_yearly, 1) def test_global_config_form_defaults_to_first_generated_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, f'value="{credential.id}" selected') self.assertContains(response, "--archive") self.assertContains(response, "/proc/***") def test_global_config_form_renders_static_container_backup_root_on_edit(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/mnt/pobsync/backups", pobsync_home="/custom/legacy/home", ) response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup root:") self.assertContains(response, "/backups") self.assertContains(response, "Config Check") self.assertContains(response, "Runtime backup root") self.assertNotContains(response, "/opt/pobsync/backups") self.assertNotContains(response, "Pobsync home") def test_global_config_form_renders_config_check_for_non_recursive_rsync(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"]) response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Global rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_global_config_form_resets_backup_root_to_static_container_path(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/mnt/pobsync/backups", pobsync_home="/custom/legacy/home", ) response = self.client.post( reverse("edit_global_config"), { "name": "default", "ssh_user": "root", "ssh_port": "22", "ssh_options": "", "rsync_binary": "rsync", "rsync_args": "", "rsync_extra_args": "", "rsync_timeout_seconds": "0", "rsync_bwlimit_kbps": "0", "default_source_root": "/", "default_destination_subdir": "", "excludes_default": "", "retention_daily": "14", "retention_weekly": "8", "retention_monthly": "12", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("dashboard")) config = GlobalConfig.objects.get(name="default") self.assertEqual(config.backup_root, "/backups") self.assertEqual(config.pobsync_home, "/opt/pobsync") def test_create_host_config_form_creates_host(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="host-key", private_key="PRIVATE KEY") response = self.client.post( reverse("create_host_config"), { "host": "web-01", "address": "web-01.example.test", "enabled": "on", "ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "/srv/www\n/srv/db", "excludes_add": "*.tmp", "excludes_replace": "", "rsync_extra_args": "--numeric-ids", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=["web-01"])) self.assertContains(response, "Host config created for web-01.") host = HostConfig.objects.get(host="web-01") self.assertEqual(host.address, "web-01.example.test") self.assertEqual(host.ssh_credential, credential) self.assertEqual(host.ssh_user, "backup") self.assertEqual(host.includes, ["/srv/www", "/srv/db"]) self.assertEqual(host.excludes_add, ["*.tmp"]) self.assertEqual(host.rsync_extra_args, ["--numeric-ids"]) self.assertEqual(host.retention_weekly, 4) def test_create_host_config_uses_global_defaults_and_prepares_directories(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="global-key", private_key="PRIVATE KEY") with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create( name="default", backup_root=str(backup_root), default_ssh_credential=credential, ssh_user="backup", ssh_port=2222, default_source_root="/srv", retention_daily=3, retention_weekly=2, retention_monthly=1, retention_yearly=0, ) get_response = self.client.get(reverse("create_host_config")) self.assertContains(get_response, 'value="backup"') self.assertContains(get_response, 'value="2222"') self.assertContains(get_response, 'value="/srv"') response = self.client.post( reverse("create_host_config"), { "host": "web-01", "address": "web-01.example.test", "enabled": "on", "ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "", "excludes_add": "", "excludes_replace": "", "rsync_extra_args": "", "retention_daily": "3", "retention_weekly": "2", "retention_monthly": "1", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=["web-01"])) self.assertContains(response, "prepared") self.assertTrue((backup_root / "web-01" / "scheduled").is_dir()) self.assertTrue((backup_root / "web-01" / "manual").is_dir()) self.assertTrue((backup_root / "web-01" / ".incomplete").is_dir()) def test_host_detail_renders_config_schedule_runs_and_snapshots(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", source_root="/srv", retention_daily=7, ) for subdir in ("scheduled", "manual", ".incomplete"): (backup_root / host.host / subdir).mkdir(parents=True) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", prune=True, last_status="success") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "web-01") self.assertContains(response, "web-01.example.test") self.assertContains(response, "15 2 * * *") self.assertContains(response, "Schedule expression") self.assertContains(response, "Evaluated by the pobsync scheduler service.") self.assertContains(response, "Next run:") self.assertContains(response, "UTC") self.assertContains(response, "20260519-021500Z__ABCDEFGH") self.assertContains(response, "Discover snapshots") self.assertContains(response, "Edit schedule") self.assertContains(response, "Edit config") self.assertContains(response, "Run connection preflight") self.assertContains(response, "Backup Control") self.assertContains(response, "Queue dry-run") self.assertContains(response, "Queue backup") self.assertContains(response, "Host Check") self.assertContains(response, reverse("prepare_host_directories", args=[host.host])) self.assertContains(response, "warning") self.assertContains(response, "Snapshot Discovery") self.assertContains(response, reverse("queue_manual_backup", args=[host.host])) self.assertContains(response, reverse("run_detail", args=[BackupRun.objects.get().id])) self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id])) def test_host_detail_renders_effective_config_preview(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/id_ed25519") GlobalConfig.objects.create( name="default", backup_root="/backups", default_ssh_credential=credential, ssh_user="root", ssh_port=2222, ssh_options=["-oBatchMode=yes"], rsync_args=["--archive", "--numeric-ids"], rsync_extra_args=["--delete"], rsync_timeout_seconds=300, rsync_bwlimit_kbps=2048, default_source_root="/", excludes_default=["/proc/***", "/sys/***"], retention_daily=14, retention_weekly=4, retention_monthly=2, retention_yearly=1, ) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv/www/***"], excludes_add=["/srv/www/cache/***"], rsync_extra_args=["--one-file-system"], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective Config") self.assertContains(response, "root@web-01.example.test:2222") self.assertContains(response, "default-key") self.assertContains(response, "-oBatchMode=yes") self.assertContains(response, "--archive --numeric-ids --delete --one-file-system") self.assertContains(response, "/srv/www/***") self.assertContains(response, "/srv/www/cache/***") self.assertContains(response, "d14") self.assertContains(response, "w8") def test_run_host_preflight_stores_remote_check_result(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with patch( "pobsync_backend.preflight.subprocess.run", return_value=subprocess.CompletedProcess(args=["ssh"], returncode=0, stdout="", stderr=""), ) as run: response = self.client.post(reverse("run_host_preflight", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Connection preflight passed for web-01.") self.assertContains(response, "Connection Preflight") self.assertContains(response, "SSH reachability") self.assertContains(response, "Remote rsync") self.assertContains(response, "Remote source root") self.assertEqual(run.call_count, 3) host.refresh_from_db() self.assertTrue(host.config["last_preflight"]["ok"]) self.assertEqual(host.config["last_preflight"]["target"], "root@web-01.example.test") def test_queue_manual_backup_blocks_real_backup_after_failed_remote_preflight(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", config={ "last_preflight": { "ok": False, "target": "root@web-01.example.test", "source_root": "/", "rsync_binary": "rsync", "checks": [ { "name": "Remote rsync", "ok": False, "exit_code": 127, "message": "Remote rsync failed.", "detail": "rsync missing", } ], } }, ) for subdir in ("scheduled", "manual", ".incomplete"): (backup_root / host.host / subdir).mkdir(parents=True) response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue real backup until failed preflight checks are resolved") self.assertContains(response, "Remote preflight") self.assertFalse(BackupRun.objects.exists()) def test_host_detail_renders_backup_trends(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") BackupRun.objects.create( host=host, run_type=BackupRun.RunType.MANUAL, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 45, "rsync": { "files_total": 250, "literal_data_bytes": 2048, "matched_data_bytes": 8192, }, }, }, ) BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 18, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 35, "rsync": { "files_total": 150, "literal_data_bytes": 1024, "matched_data_bytes": 4096, }, }, }, ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Trends") self.assertContains(response, "Avg New Data") self.assertContains(response, "Avg Daily New") self.assertContains(response, "manual") self.assertContains(response, "45s") self.assertContains(response, "250") self.assertContains(response, "2.0") self.assertContains(response, "KB") self.assertContains(response, "Run data trend") self.assertContains(response, "width: 100%") self.assertContains(response, "width: 50%") def test_prepare_host_directories_action_creates_missing_directories(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("prepare_host_directories", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Prepared backup directories") self.assertTrue((backup_root / host.host / "scheduled").is_dir()) self.assertTrue((backup_root / host.host / "manual").is_dir()) self.assertTrue((backup_root / host.host / ".incomplete").is_dir()) def test_host_detail_warns_when_rsync_is_not_recursive(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=[]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host effective rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_host_detail_warns_when_replace_excludes_drops_root_defaults(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/backups", rsync_args=["--archive"], excludes_default=["/proc/***", "/sys/***", "/dev/***", "/run/***", "/tmp/***"], ) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", excludes_replace=[], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective root excludes") self.assertContains(response, "excludes_replace is active") def test_host_detail_warns_that_includes_are_raw_rsync_rules(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv/www"], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host includes") self.assertContains(response, "Includes are passed to rsync as raw --include rules.") def test_scan_host_known_key_action_updates_selected_credential(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") GlobalConfig.objects.create(name="default", backup_root="/backups", default_ssh_credential=credential, ssh_port=2222) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with patch( "pobsync_backend.views.scan_known_host", return_value="web-01.example.test ssh-ed25519 AAAASCANNED", ) as scan: response = self.client.post(reverse("scan_host_known_key", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Stored SSH host key for web-01") scan.assert_called_once_with("web-01.example.test", port=2222) credential.refresh_from_db() self.assertEqual(credential.known_hosts, "web-01.example.test ssh-ed25519 AAAASCANNED\n") def test_host_detail_surfaces_active_backup_run(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "queued") self.assertContains(response, f"Run {run.id}") self.assertContains(response, reverse("run_detail", args=[run.id])) def test_host_detail_disables_backup_controls_without_global_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "missing global config") self.assertContains(response, "Create the default global config before queueing backups.") def test_host_detail_renders_discovery_status_for_existing_snapshot_dirs(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") (backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH").mkdir(parents=True) (backup_root / host.host / ".incomplete" / "20260519-031500Z__BROKEN01").mkdir(parents=True) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, f"Host root: {backup_root / host.host}") self.assertContains(response, "Found 2 snapshot directories") self.assertContains(response, "scheduled 1") self.assertContains(response, "incomplete 1") def test_host_detail_returns_404_for_unknown_host(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("host_detail", args=["missing-host"])) self.assertEqual(response.status_code, 404) def test_queue_manual_backup_creates_queued_run_and_redirects_to_run_detail(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), { "dry_run": "on", "verbose_output": "on", "prune": "on", "prune_max_delete": "4", "prune_protect_bases": "on", }, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, f"Queued manual backup run {run.id} for web-01.") self.assertEqual(run.status, BackupRun.Status.QUEUED) self.assertEqual(run.run_type, BackupRun.RunType.MANUAL) self.assertEqual( run.result["requested"], { "dry_run": True, "verbose_output": True, "prune": True, "prune_max_delete": 4, "prune_protect_bases": True, }, ) def test_queue_manual_backup_quick_action_can_queue_real_backup(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") for subdir in ("scheduled", "manual", ".incomplete"): (backup_root / host.host / subdir).mkdir(parents=True) response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertEqual( run.result["requested"], { "dry_run": False, "verbose_output": False, "prune": False, "prune_max_delete": 10, "prune_protect_bases": False, }, ) def test_queue_manual_backup_blocks_real_backup_when_host_directories_are_missing(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue real backup until failed preflight checks are resolved") self.assertContains(response, "Host backup root") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_allows_dry_run_with_only_storage_preflight_failures(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on", "verbose_output": "on", "prune_max_delete": "10"}, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertEqual(run.result["requested"]["dry_run"], True) def test_queue_manual_backup_requires_default_global_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Create the default global config before queueing backups.") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_rejects_disabled_host(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test", enabled=False) response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue backup for disabled host web-01.") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("queue_manual_backup", args=[host.host])) self.assertEqual(response.status_code, 405) def test_run_detail_renders_result_payload(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, snapshot_path=snapshot.path, base_path="/backups/web-01/scheduled/base", rsync_exit_code=0, result={ "ok": True, "snapshot": snapshot.path, "rsync": { "command": ["rsync", "--archive", "root@web-01:/", snapshot.path], "exit_code": 0, "log_tail": ["sending incremental file list", "sent 500 bytes"], }, "requested": { "dry_run": True, "verbose_output": True, "prune": False, "prune_max_delete": 10, "prune_protect_bases": False, }, "stats": { "duration_seconds": 12, "rsync": { "files_total": 10, "files_transferred": 2, "total_file_size_bytes": 2000, "total_transferred_file_size_bytes": 500, "literal_data_bytes": 500, "matched_data_bytes": 1500, "link_dest_estimated_savings_bytes": 1500, }, }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Run") self.assertContains(response, "web-01") self.assertContains(response, "success") self.assertContains(response, "ABCDEFGH") self.assertContains(response, "Requested Options") self.assertContains(response, "Dry run: yes") self.assertContains(response, "Verbose rsync output: yes") self.assertContains(response, "Rsync Command") self.assertContains(response, "--archive") self.assertContains(response, "Rsync Log") self.assertContains(response, "sending incremental file list") self.assertContains(response, "Dry Run Summary") self.assertContains(response, "Files Seen") self.assertContains(response, "Would Transfer") self.assertContains(response, "Transfer Estimate") self.assertContains(response, "Warnings: none recorded") self.assertContains(response, "Stats") self.assertContains(response, "Files seen: 10") self.assertContains(response, "Estimated link-dest saving") self.assertContains(response, ""ok": true") self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id])) def test_run_detail_surfaces_dry_run_warnings_and_log_link(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: log_path = Path(tmp) / "dry-run" / "rsync.log" log_path.parent.mkdir(parents=True) log_path.write_text("WARNING: noisy shell output\npermission denied\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.FAILED, rsync_exit_code=255, result={ "ok": False, "dry_run": True, "log": str(log_path), "failure": { "category": "transport", "message": "Rsync transport failed.", "hint": "Check SSH access.", }, "stats": { "duration_seconds": 4, "rsync": { "files_total": 25, "files_transferred": 3, "total_file_size_bytes": 10_000, "total_transferred_file_size_bytes": 1_500, }, }, "rsync": { "exit_code": 255, "log_tail": ["WARNING: noisy shell output", "permission denied"], }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Dry Run Summary") self.assertContains(response, "failed") self.assertContains(response, "Files Seen") self.assertContains(response, "25") self.assertContains(response, "Would Transfer") self.assertContains(response, "3") self.assertContains(response, "1.5") self.assertContains(response, "Open full rsync log") self.assertContains(response, reverse("run_rsync_log", args=[run.id])) self.assertContains(response, "Rsync transport failed.") self.assertContains(response, "Check SSH access.") self.assertContains(response, "WARNING: noisy shell output") def test_run_detail_links_existing_rsync_log(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: log_path = Path(tmp) / "snapshot" / "meta" / "rsync.log" log_path.parent.mkdir(parents=True) log_path.write_text("old line\nrsync log line\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot_path=str(log_path.parent.parent), result={"ok": True, "log": str(log_path)}, ) response = self.client.get(reverse("run_detail", args=[run.id])) log_response = self.client.get(reverse("run_rsync_log", args=[run.id])) log_body = b"".join(log_response.streaming_content) self.assertContains(response, reverse("run_rsync_log", args=[run.id])) self.assertContains(response, str(log_path)) self.assertContains(response, "rsync log line") self.assertEqual(log_response.status_code, 200) self.assertEqual(log_body, b"old line\nrsync log line\n") def test_run_detail_surfaces_failure_and_retention_warning(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create( host=host, status=BackupRun.Status.WARNING, rsync_exit_code=0, result={ "ok": True, "failure": { "category": "transport", "summary": "SSH connection dropped.", "hint": "Check network connectivity.", }, "prune": { "ok": False, "type": "ConfigError", "error": "Deletion blocked by --max-delete=0", }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Failure") self.assertContains(response, "transport") self.assertContains(response, "Check network connectivity.") self.assertContains(response, "Retention") self.assertContains(response, "Deletion blocked by --max-delete=0") def test_run_detail_surfaces_host_retention_warnings(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, result={"ok": True}) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention Warnings") self.assertContains(response, "Scheduled pruning for this host would delete 2 snapshot(s)") self.assertContains(response, "1 incomplete snapshot(s) exist for this host") def test_run_detail_infers_rsync_log_from_snapshot_path(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with TemporaryDirectory() as tmp: snapshot_path = Path(tmp) / "snapshot" log_path = snapshot_path / "meta" / "rsync.log" log_path.parent.mkdir(parents=True) log_path.write_text("scheduled log\n", encoding="utf-8") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot_path=str(snapshot_path), result={"ok": True}, ) response = self.client.get(reverse("run_rsync_log", args=[run.id])) body = b"".join(response.streaming_content) self.assertEqual(response.status_code, 200) self.assertEqual(body, b"scheduled log\n") def test_run_detail_offers_cancel_for_running_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Cancel run") self.assertContains(response, reverse("cancel_run", args=[run.id])) def test_cancel_run_marks_queued_run_cancelled(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, "Cancellation requested") run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertIsNotNone(run.ended_at) self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.QUEUED) def test_cancel_run_requests_running_run_cancellation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True) self.assertRedirects(response, reverse("run_detail", args=[run.id])) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertIsNone(run.ended_at) self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.RUNNING) def test_snapshot_detail_renders_metadata_runs_and_children(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") base = self._snapshot(host, "20260518-021500Z__BASESNAP") base.metadata = { "status": "success", "snapshot_id": "BASESNAP", "stats": { "duration_seconds": 20, "rsync": { "files_total": 100, "files_transferred": 4, "total_file_size_bytes": 10_000, "link_dest_estimated_savings_bytes": 7_000, }, "storage": { "snapshot": { "apparent_size_bytes": 10_000, "allocated_size_bytes": 3_000, "hardlinked_files": 9, }, "capacity": { "used_percent": 30.5, "available_bytes": 1_000_000, }, }, }, } base.save(update_fields=["metadata"]) child = self._snapshot(host, "20260519-021500Z__CHILDSNP") child.base = base child.base_dirname = base.dirname child.save(update_fields=["base", "base_dirname"]) run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=base) response = self.client.get(reverse("snapshot_detail", args=[base.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, base.dirname) self.assertContains(response, "BASESNAP") self.assertContains(response, "Stats") self.assertContains(response, "Files seen: 100") self.assertContains(response, "Hardlinked files: 9") self.assertContains(response, child.dirname) self.assertContains(response, f"Run {run.id}") self.assertContains(response, reverse("run_detail", args=[run.id])) self.assertContains(response, reverse("snapshot_detail", args=[child.id])) def test_discover_host_snapshots_action_discovers_and_redirects(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" meta_dir = snapshot_dir / "meta" meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Snapshot discovery scanned 1 items") self.assertTrue(SnapshotRecord.objects.filter(host=host, dirname=snapshot_dir.name).exists()) def test_discover_host_snapshots_warns_when_host_root_is_missing(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Snapshot discovery scanned 0 items") self.assertContains(response, "Host backup directory does not exist yet") self.assertFalse(SnapshotRecord.objects.exists()) def test_discover_host_snapshots_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("discover_host_snapshots", args=[host.host])) self.assertEqual(response.status_code, 405) def test_retention_plan_renders_keep_and_delete_sets(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP") new_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention Plan: web-01") self.assertContains(response, old_snapshot.dirname) self.assertContains(response, new_snapshot.dirname) self.assertContains(response, "newest") self.assertContains(response, "Would Delete") self.assertContains(response, "outside retention policy") def test_retention_plan_warns_when_scheduled_prune_limit_is_exceeded(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Scheduled Prune Limit") self.assertContains(response, "would delete 2 snapshot(s)") self.assertContains(response, "scheduled prune limit of") self.assertContains(response, "Schedule max delete: 1") def test_retention_plan_can_enable_base_protection(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) base = self._snapshot(host, "20260518-021500Z__BASESNAP") child = self._snapshot(host, "20260519-021500Z__CHILDSNP") child.base = base child.save(update_fields=["base"]) response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"protect_bases": "1"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Protect bases: yes") self.assertContains(response, "Base snapshots referenced by kept snapshots") self.assertContains(response, f"base-of:{child.dirname}") def test_retention_plan_surfaces_incomplete_snapshots_without_deleting_them(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) self._snapshot(host, "20260518-021500Z__OLDSNAP") self._snapshot(host, "20260519-021500Z__NEWSNAP") SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.INCOMPLETE, dirname="20260519-031500Z__BROKEN01", path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01", status="failed", started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Incomplete Snapshots") self.assertContains(response, "20260519-031500Z__BROKEN01") self.assertContains(response, "excluded from retention cleanup") def test_host_detail_surfaces_retention_warnings(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1) self._snapshot(host, "20260517-021500Z__OLDSNP1") self._snapshot(host, "20260518-021500Z__OLDSNP2") self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention Warnings") self.assertContains(response, "Scheduled pruning would delete 2 snapshot(s), above max delete") def test_retention_plan_rejects_invalid_kind(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"kind": "weird"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Retention kind must be scheduled, manual, or all.") def test_retention_apply_deletes_planned_snapshot_after_confirmation(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: home = Path(tmp) / "home" host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) old_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260518-021500Z__OLDSNAP" new_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260519-021500Z__NEWSNAP" old_dir.mkdir(parents=True) new_dir.mkdir(parents=True) old_snapshot = self._snapshot(host, old_dir.name) old_snapshot.path = str(old_dir) old_snapshot.save(update_fields=["path"]) new_snapshot = self._snapshot(host, new_dir.name) new_snapshot.path = str(new_dir) new_snapshot.save(update_fields=["path"]) with override_settings(POBSYNC_HOME=str(home)): response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": host.host, }, follow=True, ) self.assertFalse(old_dir.exists()) self.assertTrue(new_dir.exists()) self.assertRedirects(response, f"{reverse('host_retention_plan', args=[host.host])}?kind=scheduled") self.assertContains(response, "Retention deleted 1 snapshot(s) for web-01.") self.assertFalse(SnapshotRecord.objects.filter(pk=old_snapshot.pk).exists()) self.assertTrue(SnapshotRecord.objects.filter(pk=new_snapshot.pk).exists()) def test_retention_apply_rejects_bad_confirmation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") self._snapshot(host, "20260518-021500Z__OLDSNAP") response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": "wrong", }, follow=True, ) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "Retention apply confirmation is invalid.") self.assertEqual(SnapshotRecord.objects.count(), 1) def test_retention_apply_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("apply_host_retention", args=[host.host])) self.assertEqual(response.status_code, 405) def test_schedule_form_renders_defaults_for_new_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("edit_host_schedule", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Create Schedule") self.assertContains(response, "Schedule expression") self.assertContains(response, "evaluated by the pobsync scheduler service") self.assertContains(response, "15 2 * * *") self.assertContains(response, "Save schedule") def test_schedule_form_creates_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "30 3 * * *", "enabled": "on", "prune": "on", "prune_max_delete": "4", "prune_protect_bases": "on", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Schedule saved for web-01.") schedule = ScheduleConfig.objects.get(host=host) self.assertEqual(schedule.cron_expr, "30 3 * * *") self.assertTrue(schedule.enabled) self.assertTrue(schedule.prune) self.assertEqual(schedule.prune_max_delete, 4) self.assertTrue(schedule.prune_protect_bases) def test_schedule_form_updates_existing_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") schedule = ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True) response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "45 4 * * 1", "prune_max_delete": "8", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) schedule.refresh_from_db() self.assertEqual(schedule.cron_expr, "45 4 * * 1") self.assertFalse(schedule.enabled) self.assertFalse(schedule.prune) self.assertEqual(schedule.prune_max_delete, 8) def test_schedule_form_renders_existing_schedule_values(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") ScheduleConfig.objects.create( host=host, cron_expr="45 4 * * 1", enabled=True, prune_max_delete=8, ) response = self.client.get(reverse("edit_host_schedule", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Edit Schedule") self.assertContains(response, 'value="45 4 * * 1"', html=False) self.assertContains(response, 'value="8"', html=False) self.assertNotContains(response, 'value="15 2 * * *"', html=False) self.assertNotContains(response, ">User<", html=False) def test_schedule_form_rejects_invalid_cron(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "bad cron", "enabled": "on", "prune_max_delete": "10", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "cron expression must have exactly 5 fields") self.assertFalse(ScheduleConfig.objects.filter(host=host).exists()) def test_host_config_form_renders_existing_values(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv"], excludes_add=["*.tmp"], rsync_extra_args=["--numeric-ids"], ) response = self.client.get(reverse("edit_host_config", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Config: web-01") self.assertContains(response, "web-01.example.test") self.assertContains(response, "/srv") self.assertContains(response, "*.tmp") self.assertContains(response, "--numeric-ids") def test_host_config_form_renders_effective_config_check(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("edit_host_config", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective Config Check") self.assertContains(response, "Host effective rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_host_config_form_updates_host_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="old.example.test") response = self.client.post( reverse("edit_host_config", args=[host.host]), { "address": "new.example.test", "enabled": "on", "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "/srv/www\n/srv/db", "excludes_add": "*.tmp\ncache/", "excludes_replace": "", "rsync_extra_args": "--numeric-ids\n--delete", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Host config saved for web-01.") host.refresh_from_db() self.assertEqual(host.address, "new.example.test") self.assertEqual(host.ssh_user, "backup") self.assertEqual(host.ssh_port, 2222) self.assertEqual(host.source_root, "/srv") self.assertEqual(host.includes, ["/srv/www", "/srv/db"]) self.assertEqual(host.excludes_add, ["*.tmp", "cache/"]) self.assertIsNone(host.excludes_replace) self.assertEqual(host.rsync_extra_args, ["--numeric-ids", "--delete"]) self.assertEqual(host.retention_daily, 7) self.assertEqual(host.retention_yearly, 1) def test_host_config_form_can_replace_global_excludes(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_config", args=[host.host]), { "address": host.address, "ssh_user": "", "ssh_port": "", "source_root": "", "includes": "", "excludes_add": "", "excludes_replace": "*.cache\nnode_modules/", "rsync_extra_args": "", "retention_daily": "14", "retention_weekly": "8", "retention_monthly": "12", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) host.refresh_from_db() self.assertFalse(host.enabled) self.assertEqual(host.excludes_add, []) self.assertEqual(host.excludes_replace, ["*.cache", "node_modules/"]) def _snapshot(self, host: HostConfig, dirname: str) -> SnapshotRecord: started_at = datetime.strptime(dirname.split("__", 1)[0], "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc) return SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.SCHEDULED, dirname=dirname, path=f"/backups/{host.host}/scheduled/{dirname}", status="success", started_at=started_at, )