from __future__ import annotations import subprocess from datetime import datetime, timezone from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch from django.contrib.auth import get_user_model from django.core.files.uploadedfile import SimpleUploadedFile from django.test import TestCase, override_settings from django.urls import reverse from pobsync.util import write_yaml_atomic from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential class ViewTests(TestCase): def setUp(self) -> None: user_model = get_user_model() self.staff_user = user_model.objects.create_user( username="admin", password="secret", is_staff=True, is_superuser=True, ) def test_dashboard_requires_staff_login(self) -> None: response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 302) self.assertIn("/admin/login/", response["Location"]) def test_dashboard_renders_hosts_and_latest_runs(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Dashboard") self.assertContains(response, "web-01") self.assertContains(response, "20260519-021500Z__ABCDEFGH") self.assertContains(response, "success") def test_dashboard_renders_backup_trend_summary(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/missing-backup-root") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 30, "rsync": { "files_total": 100, "literal_data_bytes": 1000, "matched_data_bytes": 4000, }, "storage": { "capacity": { "available_bytes": 10_000, "used_percent": 25.0, } }, }, }, ) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Root Used") self.assertContains(response, "Runs Until Full") self.assertContains(response, "10") self.assertContains(response, f"Run {run.id}") self.assertContains(response, "1000") def test_dashboard_links_latest_snapshot_for_each_host(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP") latest_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Latest Snapshot") self.assertContains(response, latest_snapshot.dirname) self.assertContains(response, reverse("snapshot_detail", args=[latest_snapshot.id])) self.assertNotContains(response, reverse("snapshot_detail", args=[old_snapshot.id])) def test_dashboard_prompts_for_global_config_when_database_is_empty(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "No default global config exists yet.") self.assertContains(response, reverse("edit_global_config")) self.assertContains(response, "Create global config") def test_dashboard_prompts_for_first_host_after_global_config_exists(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups") response = self.client.get(reverse("dashboard")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Global config is ready.") self.assertContains(response, reverse("create_host_config")) self.assertContains(response, "Add first host") def test_self_check_renders_runtime_checks(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("self_check")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Self Check") self.assertContains(response, "Django debug") self.assertContains(response, "Database connection") self.assertContains(response, "POBSYNC_HOME") def test_logs_view_renders_filtered_journal_messages(self) -> None: self.client.force_login(self.staff_user) completed = subprocess.CompletedProcess( args=["journalctl"], returncode=0, stdout="2026-05-19 pobsync-worker.service failed backup\n2026-05-19 pobsync-web.service started\n", stderr="", ) with patch("pobsync_backend.views.shutil.which", return_value="/usr/bin/journalctl"), patch( "pobsync_backend.views.subprocess.run", return_value=completed ) as run: response = self.client.get(reverse("logs"), {"unit": "pobsync-worker.service", "priority": "0..3", "q": "failed"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Logs") self.assertContains(response, "failed backup") self.assertNotContains(response, "started") self.assertIn("-u", run.call_args.args[0]) self.assertIn("pobsync-worker.service", run.call_args.args[0]) def test_ssh_credentials_view_creates_key(self) -> None: self.client.force_login(self.staff_user) with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): response = self.client.post( reverse("create_ssh_credential"), { "name": "backup-key", "private_key": "PRIVATE KEY", "public_key": "", "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST", "notes": "production backup key", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH credential saved for backup-key.") self.assertContains(response, "backup-key") credential = SshCredential.objects.get(name="backup-key") self.assertEqual(credential.private_key, "PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") def test_ssh_credentials_view_creates_key_from_uploaded_file(self) -> None: self.client.force_login(self.staff_user) uploaded_key = SimpleUploadedFile("id_ed25519", b"UPLOADED PRIVATE KEY\n", content_type="text/plain") with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): response = self.client.post( reverse("create_ssh_credential"), { "name": "backup-key", "private_key_file": uploaded_key, "private_key": "", "public_key": "", "known_hosts": "", "notes": "uploaded", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) credential = SshCredential.objects.get(name="backup-key") self.assertEqual(credential.private_key, "UPLOADED PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") def test_ssh_credentials_view_generates_filesystem_key(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): response = self.client.post( reverse("generate_ssh_credential"), { "name": "generated-key", "key_type": "ed25519", "set_global_default": "", "known_hosts": "", "notes": "generated", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH key generated for generated-key.") credential = SshCredential.objects.get(name="generated-key") self.assertTrue(credential.generated) self.assertEqual(credential.private_key, "") self.assertTrue(credential.public_key.startswith("ssh-ed25519 ")) self.assertTrue(Path(credential.key_path).exists()) def test_ssh_credentials_view_deletes_unused_generated_key(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): credential = SshCredential.objects.create(name="generated-key") from pobsync_backend.ssh_keys import generate_ssh_key generate_ssh_key(credential) key_path = Path(credential.key_path) response = self.client.post(reverse("delete_ssh_credential", args=[credential.id]), follow=True) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH key deleted: generated-key.") self.assertFalse(SshCredential.objects.exists()) self.assertFalse(key_path.exists()) def test_ssh_credentials_view_rejects_invalid_key(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "not a private key", "public_key": "", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Invalid SSH private key") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None: self.client.force_login(self.staff_user) response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "ssh-ed25519 AAAATEST root@backup", "public_key": "", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "This looks like a public key") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None: self.client.force_login(self.staff_user) with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"): response = self.client.post( reverse("create_ssh_credential"), { "name": "bad-key", "private_key": "PRIVATE KEY", "public_key": "ssh-ed25519 AAAAOTHER root@backup", "known_hosts": "", "notes": "", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "Public key does not match") self.assertFalse(SshCredential.objects.exists()) def test_ssh_credentials_view_updates_existing_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY") with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="UPDATED PUBLIC KEY"): response = self.client.post( reverse("edit_ssh_credential", args=[credential.id]), { "name": "backup-key", "private_key": "UPDATED KEY", "public_key": "", "known_hosts": "", "notes": "rotated", }, follow=True, ) self.assertRedirects(response, reverse("ssh_credentials")) credential.refresh_from_db() self.assertEqual(credential.private_key, "UPDATED KEY\n") self.assertEqual(credential.public_key, "UPDATED PUBLIC KEY") self.assertEqual(credential.notes, "rotated") def test_global_config_form_creates_default_config(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="PRIVATE KEY") response = self.client.post( reverse("edit_global_config"), { "name": "default", "default_ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "ssh_options": "StrictHostKeyChecking=no\nBatchMode=yes", "rsync_binary": "rsync", "rsync_args": "-a\n--numeric-ids", "rsync_extra_args": "--delete", "rsync_timeout_seconds": "60", "rsync_bwlimit_kbps": "1000", "default_source_root": "/srv", "default_destination_subdir": "rootfs", "excludes_default": "*.tmp\ncache/", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("dashboard")) self.assertContains(response, "Global config saved for default.") config = GlobalConfig.objects.get(name="default") self.assertEqual(config.backup_root, "/backups") self.assertEqual(config.pobsync_home, "/opt/pobsync") self.assertEqual(config.default_ssh_credential, credential) self.assertEqual(config.ssh_user, "backup") self.assertEqual(config.ssh_port, 2222) self.assertEqual(config.ssh_options, ["StrictHostKeyChecking=no", "BatchMode=yes"]) self.assertEqual(config.rsync_args, ["-a", "--numeric-ids"]) self.assertEqual(config.rsync_extra_args, ["--delete"]) self.assertEqual(config.excludes_default, ["*.tmp", "cache/"]) self.assertEqual(config.retention_daily, 7) self.assertEqual(config.retention_yearly, 1) def test_global_config_form_defaults_to_first_generated_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, f'value="{credential.id}" selected') self.assertContains(response, "--archive") self.assertContains(response, "/proc/***") def test_global_config_form_renders_static_container_backup_root_on_edit(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/mnt/pobsync/backups", pobsync_home="/custom/legacy/home", ) response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup root:") self.assertContains(response, "/backups") self.assertContains(response, "Config Check") self.assertContains(response, "Runtime backup root") self.assertNotContains(response, "/opt/pobsync/backups") self.assertNotContains(response, "Pobsync home") def test_global_config_form_renders_config_check_for_non_recursive_rsync(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"]) response = self.client.get(reverse("edit_global_config")) self.assertEqual(response.status_code, 200) self.assertContains(response, "Global rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_global_config_form_resets_backup_root_to_static_container_path(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/mnt/pobsync/backups", pobsync_home="/custom/legacy/home", ) response = self.client.post( reverse("edit_global_config"), { "name": "default", "ssh_user": "root", "ssh_port": "22", "ssh_options": "", "rsync_binary": "rsync", "rsync_args": "", "rsync_extra_args": "", "rsync_timeout_seconds": "0", "rsync_bwlimit_kbps": "0", "default_source_root": "/", "default_destination_subdir": "", "excludes_default": "", "retention_daily": "14", "retention_weekly": "8", "retention_monthly": "12", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("dashboard")) config = GlobalConfig.objects.get(name="default") self.assertEqual(config.backup_root, "/backups") self.assertEqual(config.pobsync_home, "/opt/pobsync") def test_create_host_config_form_creates_host(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="host-key", private_key="PRIVATE KEY") response = self.client.post( reverse("create_host_config"), { "host": "web-01", "address": "web-01.example.test", "enabled": "on", "ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "/srv/www\n/srv/db", "excludes_add": "*.tmp", "excludes_replace": "", "rsync_extra_args": "--numeric-ids", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=["web-01"])) self.assertContains(response, "Host config created for web-01.") host = HostConfig.objects.get(host="web-01") self.assertEqual(host.address, "web-01.example.test") self.assertEqual(host.ssh_credential, credential) self.assertEqual(host.ssh_user, "backup") self.assertEqual(host.includes, ["/srv/www", "/srv/db"]) self.assertEqual(host.excludes_add, ["*.tmp"]) self.assertEqual(host.rsync_extra_args, ["--numeric-ids"]) self.assertEqual(host.retention_weekly, 4) def test_create_host_config_uses_global_defaults_and_prepares_directories(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="global-key", private_key="PRIVATE KEY") with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create( name="default", backup_root=str(backup_root), default_ssh_credential=credential, ssh_user="backup", ssh_port=2222, default_source_root="/srv", retention_daily=3, retention_weekly=2, retention_monthly=1, retention_yearly=0, ) get_response = self.client.get(reverse("create_host_config")) self.assertContains(get_response, 'value="backup"') self.assertContains(get_response, 'value="2222"') self.assertContains(get_response, 'value="/srv"') response = self.client.post( reverse("create_host_config"), { "host": "web-01", "address": "web-01.example.test", "enabled": "on", "ssh_credential": str(credential.id), "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "", "excludes_add": "", "excludes_replace": "", "rsync_extra_args": "", "retention_daily": "3", "retention_weekly": "2", "retention_monthly": "1", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=["web-01"])) self.assertContains(response, "prepared") self.assertTrue((backup_root / "web-01" / "scheduled").is_dir()) self.assertTrue((backup_root / "web-01" / "manual").is_dir()) self.assertTrue((backup_root / "web-01" / ".incomplete").is_dir()) def test_host_detail_renders_config_schedule_runs_and_snapshots(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create( host="web-01", address="web-01.example.test", source_root="/srv", retention_daily=7, ) ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", prune=True, last_status="success") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "web-01") self.assertContains(response, "web-01.example.test") self.assertContains(response, "15 2 * * *") self.assertContains(response, "20260519-021500Z__ABCDEFGH") self.assertContains(response, "Discover snapshots") self.assertContains(response, "Edit schedule") self.assertContains(response, "Edit config") self.assertContains(response, "Backup Control") self.assertContains(response, "Queue dry-run") self.assertContains(response, "Queue backup") self.assertContains(response, "Host Check") self.assertContains(response, reverse("prepare_host_directories", args=[host.host])) self.assertContains(response, "ready") self.assertContains(response, "Snapshot Discovery") self.assertContains(response, reverse("queue_manual_backup", args=[host.host])) self.assertContains(response, reverse("run_detail", args=[BackupRun.objects.get().id])) self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id])) def test_host_detail_renders_backup_trends(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc), result={ "ok": True, "dry_run": False, "stats": { "duration_seconds": 45, "rsync": { "files_total": 250, "literal_data_bytes": 2048, "matched_data_bytes": 8192, }, }, }, ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Backup Trends") self.assertContains(response, "Avg New Data") self.assertContains(response, "45s") self.assertContains(response, "250") self.assertContains(response, "2.0") self.assertContains(response, "KB") def test_prepare_host_directories_action_creates_missing_directories(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("prepare_host_directories", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Prepared backup directories") self.assertTrue((backup_root / host.host / "scheduled").is_dir()) self.assertTrue((backup_root / host.host / "manual").is_dir()) self.assertTrue((backup_root / host.host / ".incomplete").is_dir()) def test_host_detail_warns_when_rsync_is_not_recursive(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=[]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host effective rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_host_detail_warns_when_replace_excludes_drops_root_defaults(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( name="default", backup_root="/backups", rsync_args=["--archive"], excludes_default=["/proc/***", "/sys/***", "/dev/***", "/run/***", "/tmp/***"], ) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", excludes_replace=[], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective root excludes") self.assertContains(response, "excludes_replace is active") def test_host_detail_warns_that_includes_are_raw_rsync_rules(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"]) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv/www"], ) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Host includes") self.assertContains(response, "Includes are passed to rsync as raw --include rules.") def test_scan_host_known_key_action_updates_selected_credential(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") GlobalConfig.objects.create(name="default", backup_root="/backups", default_ssh_credential=credential, ssh_port=2222) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") with patch( "pobsync_backend.views.scan_known_host", return_value="web-01.example.test ssh-ed25519 AAAASCANNED", ) as scan: response = self.client.post(reverse("scan_host_known_key", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Stored SSH host key for web-01") scan.assert_called_once_with("web-01.example.test", port=2222) credential.refresh_from_db() self.assertEqual(credential.known_hosts, "web-01.example.test ssh-ed25519 AAAASCANNED\n") def test_host_detail_surfaces_active_backup_run(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "queued") self.assertContains(response, f"Run {run.id}") self.assertContains(response, reverse("run_detail", args=[run.id])) def test_host_detail_disables_backup_controls_without_global_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "missing global config") self.assertContains(response, "Create the default global config before queueing backups.") def test_host_detail_renders_discovery_status_for_existing_snapshot_dirs(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") (backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH").mkdir(parents=True) (backup_root / host.host / ".incomplete" / "20260519-031500Z__BROKEN01").mkdir(parents=True) response = self.client.get(reverse("host_detail", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, f"Host root: {backup_root / host.host}") self.assertContains(response, "Found 2 snapshot directories") self.assertContains(response, "scheduled 1") self.assertContains(response, "incomplete 1") def test_host_detail_returns_404_for_unknown_host(self) -> None: self.client.force_login(self.staff_user) response = self.client.get(reverse("host_detail", args=["missing-host"])) self.assertEqual(response.status_code, 404) def test_queue_manual_backup_creates_queued_run_and_redirects_to_run_detail(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), { "dry_run": "on", "verbose_output": "on", "prune": "on", "prune_max_delete": "4", "prune_protect_bases": "on", }, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, f"Queued manual backup run {run.id} for web-01.") self.assertEqual(run.status, BackupRun.Status.QUEUED) self.assertEqual(run.run_type, BackupRun.RunType.MANUAL) self.assertEqual( run.result["requested"], { "dry_run": True, "verbose_output": True, "prune": True, "prune_max_delete": 4, "prune_protect_bases": True, }, ) def test_queue_manual_backup_quick_action_can_queue_real_backup(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("queue_manual_backup", args=[host.host]), {"prune_max_delete": "10"}, follow=True, ) run = BackupRun.objects.get(host=host) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertEqual( run.result["requested"], { "dry_run": False, "verbose_output": False, "prune": False, "prune_max_delete": 10, "prune_protect_bases": False, }, ) def test_queue_manual_backup_requires_default_global_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Create the default global config before queueing backups.") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_rejects_disabled_host(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups") host = HostConfig.objects.create(host="web-01", address="web-01.example.test", enabled=False) response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Cannot queue backup for disabled host web-01.") self.assertFalse(BackupRun.objects.exists()) def test_queue_manual_backup_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("queue_manual_backup", args=[host.host])) self.assertEqual(response.status_code, 405) def test_run_detail_renders_result_payload(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH") run = BackupRun.objects.create( host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot, snapshot_path=snapshot.path, base_path="/backups/web-01/scheduled/base", rsync_exit_code=0, result={ "ok": True, "snapshot": snapshot.path, "requested": { "dry_run": True, "verbose_output": True, "prune": False, "prune_max_delete": 10, "prune_protect_bases": False, }, "stats": { "duration_seconds": 12, "rsync": { "files_total": 10, "files_transferred": 2, "total_file_size_bytes": 2000, "total_transferred_file_size_bytes": 500, "literal_data_bytes": 500, "matched_data_bytes": 1500, "link_dest_estimated_savings_bytes": 1500, }, }, }, ) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Run") self.assertContains(response, "web-01") self.assertContains(response, "success") self.assertContains(response, "ABCDEFGH") self.assertContains(response, "Requested Options") self.assertContains(response, "Dry run: yes") self.assertContains(response, "Verbose rsync output: yes") self.assertContains(response, "Stats") self.assertContains(response, "Files seen: 10") self.assertContains(response, "Estimated link-dest saving") self.assertContains(response, ""ok": true") self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id])) def test_run_detail_offers_cancel_for_running_run(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.get(reverse("run_detail", args=[run.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Cancel run") self.assertContains(response, reverse("cancel_run", args=[run.id])) def test_cancel_run_marks_queued_run_cancelled(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED) response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True) self.assertRedirects(response, reverse("run_detail", args=[run.id])) self.assertContains(response, "Cancellation requested") run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertIsNotNone(run.ended_at) self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.QUEUED) def test_cancel_run_requests_running_run_cancellation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING) response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True) self.assertRedirects(response, reverse("run_detail", args=[run.id])) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertIsNone(run.ended_at) self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.RUNNING) def test_snapshot_detail_renders_metadata_runs_and_children(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") base = self._snapshot(host, "20260518-021500Z__BASESNAP") base.metadata = { "status": "success", "snapshot_id": "BASESNAP", "stats": { "duration_seconds": 20, "rsync": { "files_total": 100, "files_transferred": 4, "total_file_size_bytes": 10_000, "link_dest_estimated_savings_bytes": 7_000, }, "storage": { "snapshot": { "apparent_size_bytes": 10_000, "allocated_size_bytes": 3_000, "hardlinked_files": 9, }, "capacity": { "used_percent": 30.5, "available_bytes": 1_000_000, }, }, }, } base.save(update_fields=["metadata"]) child = self._snapshot(host, "20260519-021500Z__CHILDSNP") child.base = base child.base_dirname = base.dirname child.save(update_fields=["base", "base_dirname"]) run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=base) response = self.client.get(reverse("snapshot_detail", args=[base.id])) self.assertEqual(response.status_code, 200) self.assertContains(response, base.dirname) self.assertContains(response, "BASESNAP") self.assertContains(response, "Stats") self.assertContains(response, "Files seen: 100") self.assertContains(response, "Hardlinked files: 9") self.assertContains(response, child.dirname) self.assertContains(response, f"Run {run.id}") self.assertContains(response, reverse("run_detail", args=[run.id])) self.assertContains(response, reverse("snapshot_detail", args=[child.id])) def test_discover_host_snapshots_action_discovers_and_redirects(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" meta_dir = snapshot_dir / "meta" meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Snapshot discovery scanned 1 items") self.assertTrue(SnapshotRecord.objects.filter(host=host, dirname=snapshot_dir.name).exists()) def test_discover_host_snapshots_warns_when_host_root_is_missing(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Snapshot discovery scanned 0 items") self.assertContains(response, "Host backup directory does not exist yet") self.assertFalse(SnapshotRecord.objects.exists()) def test_discover_host_snapshots_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("discover_host_snapshots", args=[host.host])) self.assertEqual(response.status_code, 405) def test_retention_plan_renders_keep_and_delete_sets(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP") new_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP") response = self.client.get(reverse("host_retention_plan", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Retention Plan: web-01") self.assertContains(response, old_snapshot.dirname) self.assertContains(response, new_snapshot.dirname) self.assertContains(response, "newest") self.assertContains(response, "Would Delete") def test_retention_plan_can_enable_base_protection(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) base = self._snapshot(host, "20260518-021500Z__BASESNAP") child = self._snapshot(host, "20260519-021500Z__CHILDSNP") child.base = base child.save(update_fields=["base"]) response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"protect_bases": "1"}) self.assertEqual(response.status_code, 200) self.assertContains(response, "Protect bases: yes") self.assertContains(response, f"base-of:{child.dirname}") def test_retention_plan_rejects_invalid_kind(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"kind": "weird"}, follow=True) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Retention kind must be scheduled, manual, or all.") def test_retention_apply_deletes_planned_snapshot_after_confirmation(self) -> None: self.client.force_login(self.staff_user) with TemporaryDirectory() as tmp: home = Path(tmp) / "home" host = HostConfig.objects.create( host="web-01", address="web-01.example.test", retention_daily=0, retention_weekly=0, retention_monthly=0, retention_yearly=0, ) old_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260518-021500Z__OLDSNAP" new_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260519-021500Z__NEWSNAP" old_dir.mkdir(parents=True) new_dir.mkdir(parents=True) old_snapshot = self._snapshot(host, old_dir.name) old_snapshot.path = str(old_dir) old_snapshot.save(update_fields=["path"]) new_snapshot = self._snapshot(host, new_dir.name) new_snapshot.path = str(new_dir) new_snapshot.save(update_fields=["path"]) with override_settings(POBSYNC_HOME=str(home)): response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": host.host, }, follow=True, ) self.assertFalse(old_dir.exists()) self.assertTrue(new_dir.exists()) self.assertRedirects(response, f"{reverse('host_retention_plan', args=[host.host])}?kind=scheduled") self.assertContains(response, "Retention deleted 1 snapshot(s) for web-01.") self.assertFalse(SnapshotRecord.objects.filter(pk=old_snapshot.pk).exists()) self.assertTrue(SnapshotRecord.objects.filter(pk=new_snapshot.pk).exists()) def test_retention_apply_rejects_bad_confirmation(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") self._snapshot(host, "20260518-021500Z__OLDSNAP") response = self.client.post( reverse("apply_host_retention", args=[host.host]), { "kind": "scheduled", "max_delete": "1", "confirm_host": "wrong", }, follow=True, ) self.assertRedirects(response, reverse("host_retention_plan", args=[host.host])) self.assertContains(response, "Retention apply confirmation is invalid.") self.assertEqual(SnapshotRecord.objects.count(), 1) def test_retention_apply_requires_post(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("apply_host_retention", args=[host.host])) self.assertEqual(response.status_code, 405) def test_schedule_form_renders_defaults_for_new_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("edit_host_schedule", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Create Schedule") self.assertContains(response, "15 2 * * *") self.assertContains(response, "Save schedule") def test_schedule_form_creates_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "30 3 * * *", "user": "root", "enabled": "on", "prune": "on", "prune_max_delete": "4", "prune_protect_bases": "on", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Schedule saved for web-01.") schedule = ScheduleConfig.objects.get(host=host) self.assertEqual(schedule.cron_expr, "30 3 * * *") self.assertTrue(schedule.enabled) self.assertTrue(schedule.prune) self.assertEqual(schedule.prune_max_delete, 4) self.assertTrue(schedule.prune_protect_bases) def test_schedule_form_updates_existing_schedule(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") schedule = ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True) response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "45 4 * * 1", "user": "backup", "prune_max_delete": "8", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) schedule.refresh_from_db() self.assertEqual(schedule.cron_expr, "45 4 * * 1") self.assertEqual(schedule.user, "backup") self.assertFalse(schedule.enabled) self.assertFalse(schedule.prune) self.assertEqual(schedule.prune_max_delete, 8) def test_schedule_form_rejects_invalid_cron(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_schedule", args=[host.host]), { "cron_expr": "bad cron", "user": "root", "enabled": "on", "prune_max_delete": "10", }, ) self.assertEqual(response.status_code, 200) self.assertContains(response, "cron expression must have exactly 5 fields") self.assertFalse(ScheduleConfig.objects.filter(host=host).exists()) def test_host_config_form_renders_existing_values(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create( host="web-01", address="web-01.example.test", includes=["/srv"], excludes_add=["*.tmp"], rsync_extra_args=["--numeric-ids"], ) response = self.client.get(reverse("edit_host_config", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Config: web-01") self.assertContains(response, "web-01.example.test") self.assertContains(response, "/srv") self.assertContains(response, "*.tmp") self.assertContains(response, "--numeric-ids") def test_host_config_form_renders_effective_config_check(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"]) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.get(reverse("edit_host_config", args=[host.host])) self.assertEqual(response.status_code, 200) self.assertContains(response, "Effective Config Check") self.assertContains(response, "Host effective rsync recursion") self.assertContains(response, "Rsync args do not include archive or recursive transfer.") def test_host_config_form_updates_host_config(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="old.example.test") response = self.client.post( reverse("edit_host_config", args=[host.host]), { "address": "new.example.test", "enabled": "on", "ssh_user": "backup", "ssh_port": "2222", "source_root": "/srv", "includes": "/srv/www\n/srv/db", "excludes_add": "*.tmp\ncache/", "excludes_replace": "", "rsync_extra_args": "--numeric-ids\n--delete", "retention_daily": "7", "retention_weekly": "4", "retention_monthly": "2", "retention_yearly": "1", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) self.assertContains(response, "Host config saved for web-01.") host.refresh_from_db() self.assertEqual(host.address, "new.example.test") self.assertEqual(host.ssh_user, "backup") self.assertEqual(host.ssh_port, 2222) self.assertEqual(host.source_root, "/srv") self.assertEqual(host.includes, ["/srv/www", "/srv/db"]) self.assertEqual(host.excludes_add, ["*.tmp", "cache/"]) self.assertIsNone(host.excludes_replace) self.assertEqual(host.rsync_extra_args, ["--numeric-ids", "--delete"]) self.assertEqual(host.retention_daily, 7) self.assertEqual(host.retention_yearly, 1) def test_host_config_form_can_replace_global_excludes(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") response = self.client.post( reverse("edit_host_config", args=[host.host]), { "address": host.address, "ssh_user": "", "ssh_port": "", "source_root": "", "includes": "", "excludes_add": "", "excludes_replace": "*.cache\nnode_modules/", "rsync_extra_args": "", "retention_daily": "14", "retention_weekly": "8", "retention_monthly": "12", "retention_yearly": "0", }, follow=True, ) self.assertRedirects(response, reverse("host_detail", args=[host.host])) host.refresh_from_db() self.assertFalse(host.enabled) self.assertEqual(host.excludes_add, []) self.assertEqual(host.excludes_replace, ["*.cache", "node_modules/"]) def _snapshot(self, host: HostConfig, dirname: str) -> SnapshotRecord: started_at = datetime.strptime(dirname.split("__", 1)[0], "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc) return SnapshotRecord.objects.create( host=host, kind=SnapshotRecord.Kind.SCHEDULED, dirname=dirname, path=f"/backups/{host.host}/scheduled/{dirname}", status="success", started_at=started_at, )