Files
pobsync/src/pobsync_backend/tests/test_views.py

1909 lines
86 KiB
Python
Raw Normal View History

from __future__ import annotations
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
from django.contrib.auth import get_user_model
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase, override_settings
from django.urls import reverse
from pobsync.util import write_yaml_atomic
from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential
class ViewTests(TestCase):
def setUp(self) -> None:
user_model = get_user_model()
self.staff_user = user_model.objects.create_user(
username="admin",
password="secret",
is_staff=True,
is_superuser=True,
)
def test_dashboard_requires_staff_login(self) -> None:
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 302)
self.assertIn("/admin/login/", response["Location"])
def test_dashboard_renders_hosts_and_latest_runs(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
run = BackupRun.objects.create(
host=host,
run_type=BackupRun.RunType.MANUAL,
status=BackupRun.Status.SUCCESS,
snapshot=snapshot,
started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc),
)
warning_run = BackupRun.objects.create(
host=host,
run_type=BackupRun.RunType.SCHEDULED,
status=BackupRun.Status.WARNING,
started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc),
result={
"ok": True,
"prune": {
"ok": False,
"error": "Retention warning",
},
},
)
BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED)
BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING)
BackupRun.objects.create(
host=host,
run_type=BackupRun.RunType.MANUAL,
status=BackupRun.Status.FAILED,
started_at=datetime(2026, 5, 19, 1, 15, tzinfo=timezone.utc),
)
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Dashboard")
self.assertContains(response, "web-01")
self.assertContains(response, "20260519-021500Z__ABCDEFGH")
self.assertContains(response, "success")
self.assertContains(response, "Last Good Backup")
self.assertContains(response, "Latest Issue")
self.assertContains(response, f"Run {run.id}")
self.assertContains(response, f"Run {warning_run.id}")
self.assertContains(response, "warning")
self.assertContains(response, "manual")
self.assertContains(response, "scheduled")
self.assertContains(response, "Backup activity")
self.assertContains(response, "Snapshot health")
self.assertContains(response, "queued 1")
self.assertContains(response, "running 1")
self.assertContains(response, "warning 1")
self.assertContains(response, "failed 1")
self.assertContains(response, "Operational Status")
self.assertContains(response, "1 failed run needs review.")
self.assertContains(response, "1 run completed with warnings.")
self.assertContains(response, "1 backup run in progress.")
self.assertContains(response, "1 backup run waiting for the worker.")
def test_dashboard_renders_backup_trend_summary(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/missing-backup-root")
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
run = BackupRun.objects.create(
host=host,
run_type=BackupRun.RunType.MANUAL,
status=BackupRun.Status.SUCCESS,
snapshot=snapshot,
started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc),
result={
"ok": True,
"dry_run": False,
"stats": {
"duration_seconds": 30,
"rsync": {
"files_total": 100,
"literal_data_bytes": 1000,
"matched_data_bytes": 4000,
},
"storage": {
"capacity": {
"available_bytes": 10_000,
"used_percent": 25.0,
}
},
},
},
)
ScheduleConfig.objects.create(host=host, cron_expr="* * * * *", enabled=True)
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Backup Trends")
self.assertContains(response, "Storage Used")
self.assertContains(response, "Runway")
self.assertContains(response, "New Data")
self.assertContains(response, "Link-Dest Savings")
self.assertContains(response, "80.0%")
self.assertContains(response, "10 days")
self.assertContains(response, "Warnings")
self.assertContains(response, "Queued")
self.assertContains(response, "Next Run")
self.assertContains(response, "UTC")
self.assertContains(response, "10")
self.assertContains(response, f"Run {run.id}")
self.assertContains(response, "manual")
self.assertContains(response, "1000")
def test_dashboard_explains_missing_backup_trends(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups")
HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Backup Trends")
self.assertContains(response, "No completed backup runs with stats yet.")
self.assertContains(response, "growth estimates")
def test_dashboard_shows_all_clear_operational_status(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups")
HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Operational Status")
self.assertContains(response, "No queued, running, warning, or failed runs.")
def test_dashboard_surfaces_retention_warnings(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1)
self._snapshot(host, "20260517-021500Z__OLDSNP1")
self._snapshot(host, "20260518-021500Z__OLDSNP2")
self._snapshot(host, "20260519-021500Z__NEWSNAP")
SnapshotRecord.objects.create(
host=host,
kind=SnapshotRecord.Kind.INCOMPLETE,
dirname="20260519-031500Z__BROKEN01",
path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01",
status="failed",
started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc),
)
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Scheduled prune would delete 2 snapshot(s), above max 1.")
self.assertContains(response, "1 incomplete snapshot(s) need review.")
def test_dashboard_links_latest_snapshot_for_each_host(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP")
latest_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP")
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Latest Snapshot")
self.assertContains(response, latest_snapshot.dirname)
self.assertContains(response, reverse("snapshot_detail", args=[latest_snapshot.id]))
self.assertNotContains(response, reverse("snapshot_detail", args=[old_snapshot.id]))
def test_dashboard_prompts_for_global_config_when_database_is_empty(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "No default global config exists yet.")
self.assertContains(response, reverse("edit_global_config"))
self.assertContains(response, "Create global config")
def test_dashboard_prompts_for_first_host_after_global_config_exists(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/opt/pobsync/backups")
response = self.client.get(reverse("dashboard"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Global config is ready.")
self.assertContains(response, reverse("create_host_config"))
self.assertContains(response, "Add first host")
def test_self_check_renders_runtime_checks(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.get(reverse("self_check"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Self Check")
self.assertContains(response, "Django debug")
self.assertContains(response, "Database connection")
self.assertContains(response, "State root")
def test_logs_view_renders_filtered_journal_messages(self) -> None:
self.client.force_login(self.staff_user)
completed = subprocess.CompletedProcess(
args=["journalctl"],
returncode=0,
stdout=(
"2026-05-19 pobsync-worker.service web-01 failed backup run 12\n"
"2026-05-19 pobsync-worker.service web-02 failed backup run 12\n"
"2026-05-19 pobsync-web.service web-01 started run 12\n"
),
stderr="",
)
with patch("pobsync_backend.views.shutil.which", return_value="/usr/bin/journalctl"), patch(
"pobsync_backend.views.subprocess.run", return_value=completed
) as run:
response = self.client.get(
reverse("logs"),
{
"unit": "pobsync-worker.service",
"priority": "0..3",
"window": "6h",
"host": "web-01",
"run": "12",
"q": "failed",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Logs")
self.assertContains(response, "web-01 failed backup run 12")
self.assertNotContains(response, "web-02 failed backup run 12")
self.assertNotContains(response, "started")
command = run.call_args.args[0]
self.assertIn("-u", command)
self.assertIn("pobsync-worker.service", command)
self.assertIn("-p", command)
self.assertIn("0..3", command)
self.assertIn("--since", command)
self.assertIn("6 hours ago", command)
def test_ssh_credentials_view_creates_key(self) -> None:
self.client.force_login(self.staff_user)
with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"):
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "backup-key",
"private_key": "PRIVATE KEY",
"public_key": "",
"known_hosts": "web-01.example.test ssh-ed25519 AAAATEST",
"notes": "production backup key",
},
follow=True,
)
self.assertRedirects(response, reverse("ssh_credentials"))
self.assertContains(response, "SSH credential saved for backup-key.")
self.assertContains(response, "backup-key")
credential = SshCredential.objects.get(name="backup-key")
self.assertEqual(credential.private_key, "PRIVATE KEY\n")
self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY")
def test_ssh_credentials_view_creates_key_from_uploaded_file(self) -> None:
self.client.force_login(self.staff_user)
uploaded_key = SimpleUploadedFile("id_ed25519", b"UPLOADED PRIVATE KEY\n", content_type="text/plain")
with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"):
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "backup-key",
"private_key_file": uploaded_key,
"private_key": "",
"public_key": "",
"known_hosts": "",
"notes": "uploaded",
},
follow=True,
)
self.assertRedirects(response, reverse("ssh_credentials"))
credential = SshCredential.objects.get(name="backup-key")
self.assertEqual(credential.private_key, "UPLOADED PRIVATE KEY\n")
self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY")
def test_ssh_credentials_view_generates_filesystem_key(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")):
response = self.client.post(
reverse("generate_ssh_credential"),
{
"name": "generated-key",
"key_type": "ed25519",
"set_global_default": "",
"known_hosts": "",
"notes": "generated",
},
follow=True,
)
self.assertRedirects(response, reverse("ssh_credentials"))
self.assertContains(response, "SSH key generated for generated-key.")
credential = SshCredential.objects.get(name="generated-key")
self.assertTrue(credential.generated)
self.assertEqual(credential.private_key, "")
self.assertTrue(credential.public_key.startswith("ssh-ed25519 "))
self.assertTrue(Path(credential.key_path).exists())
def test_ssh_credentials_view_deletes_unused_generated_key(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")):
credential = SshCredential.objects.create(name="generated-key")
from pobsync_backend.ssh_keys import generate_ssh_key
generate_ssh_key(credential)
key_path = Path(credential.key_path)
response = self.client.post(reverse("delete_ssh_credential", args=[credential.id]), follow=True)
self.assertRedirects(response, reverse("ssh_credentials"))
self.assertContains(response, "SSH key deleted: generated-key.")
self.assertFalse(SshCredential.objects.exists())
self.assertFalse(key_path.exists())
def test_ssh_credentials_view_rejects_invalid_key(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "bad-key",
"private_key": "not a private key",
"public_key": "",
"known_hosts": "",
"notes": "",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Invalid SSH private key")
self.assertFalse(SshCredential.objects.exists())
def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "bad-key",
"private_key": "ssh-ed25519 AAAATEST root@backup",
"public_key": "",
"known_hosts": "",
"notes": "",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "This looks like a public key")
self.assertFalse(SshCredential.objects.exists())
def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None:
self.client.force_login(self.staff_user)
with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"):
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "bad-key",
"private_key": "PRIVATE KEY",
"public_key": "ssh-ed25519 AAAAOTHER root@backup",
"known_hosts": "",
"notes": "",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Public key does not match")
self.assertFalse(SshCredential.objects.exists())
def test_ssh_credentials_view_updates_existing_key(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY")
with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="UPDATED PUBLIC KEY"):
response = self.client.post(
reverse("edit_ssh_credential", args=[credential.id]),
{
"name": "backup-key",
"private_key": "UPDATED KEY",
"public_key": "",
"known_hosts": "",
"notes": "rotated",
},
follow=True,
)
self.assertRedirects(response, reverse("ssh_credentials"))
credential.refresh_from_db()
self.assertEqual(credential.private_key, "UPDATED KEY\n")
self.assertEqual(credential.public_key, "UPDATED PUBLIC KEY")
self.assertEqual(credential.notes, "rotated")
def test_global_config_form_creates_default_config(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="backup-key", private_key="PRIVATE KEY")
response = self.client.post(
reverse("edit_global_config"),
{
"name": "default",
"default_ssh_credential": str(credential.id),
"ssh_user": "backup",
"ssh_port": "2222",
"ssh_options": "StrictHostKeyChecking=no\nBatchMode=yes",
"rsync_binary": "rsync",
"rsync_args": "-a\n--numeric-ids",
"rsync_extra_args": "--delete",
"rsync_timeout_seconds": "60",
"rsync_bwlimit_kbps": "1000",
"default_source_root": "/srv",
"default_destination_subdir": "rootfs",
"excludes_default": "*.tmp\ncache/",
"retention_daily": "7",
"retention_weekly": "4",
"retention_monthly": "2",
"retention_yearly": "1",
},
follow=True,
)
self.assertRedirects(response, reverse("dashboard"))
self.assertContains(response, "Global config saved for default.")
config = GlobalConfig.objects.get(name="default")
self.assertEqual(config.backup_root, "/backups")
self.assertEqual(config.default_ssh_credential, credential)
self.assertEqual(config.ssh_user, "backup")
self.assertEqual(config.ssh_port, 2222)
self.assertEqual(config.ssh_options, ["StrictHostKeyChecking=no", "BatchMode=yes"])
self.assertEqual(config.rsync_args, ["-a", "--numeric-ids"])
self.assertEqual(config.rsync_extra_args, ["--delete"])
self.assertEqual(config.excludes_default, ["*.tmp", "cache/"])
self.assertEqual(config.retention_daily, 7)
self.assertEqual(config.retention_yearly, 1)
def test_global_config_form_defaults_to_first_generated_key(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="default", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity")
response = self.client.get(reverse("edit_global_config"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, f'value="{credential.id}" selected')
self.assertContains(response, "--archive")
self.assertContains(response, "/proc/***")
def test_global_config_form_renders_static_container_backup_root_on_edit(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(
name="default",
backup_root="/mnt/pobsync/backups",
)
response = self.client.get(reverse("edit_global_config"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Backup root:")
self.assertContains(response, "/backups")
self.assertContains(response, "Config Check")
self.assertContains(response, "Runtime backup root")
self.assertContains(response, "Runtime state root")
self.assertNotContains(response, "/opt/pobsync/backups")
self.assertNotContains(response, "Pobsync home")
self.assertNotContains(response, "Global pobsync home")
def test_global_config_form_renders_config_check_for_non_recursive_rsync(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"])
response = self.client.get(reverse("edit_global_config"))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Global rsync recursion")
self.assertContains(response, "Rsync args do not include archive or recursive transfer.")
def test_global_config_form_resets_backup_root_to_static_container_path(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(
name="default",
backup_root="/mnt/pobsync/backups",
)
response = self.client.post(
reverse("edit_global_config"),
{
"name": "default",
"ssh_user": "root",
"ssh_port": "22",
"ssh_options": "",
"rsync_binary": "rsync",
"rsync_args": "",
"rsync_extra_args": "",
"rsync_timeout_seconds": "0",
"rsync_bwlimit_kbps": "0",
"default_source_root": "/",
"default_destination_subdir": "",
"excludes_default": "",
"retention_daily": "14",
"retention_weekly": "8",
"retention_monthly": "12",
"retention_yearly": "0",
},
follow=True,
)
self.assertRedirects(response, reverse("dashboard"))
config = GlobalConfig.objects.get(name="default")
self.assertEqual(config.backup_root, "/backups")
def test_create_host_config_form_creates_host(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="host-key", private_key="PRIVATE KEY")
response = self.client.post(
reverse("create_host_config"),
{
"host": "web-01",
"address": "web-01.example.test",
"enabled": "on",
"ssh_credential": str(credential.id),
"ssh_user": "backup",
"ssh_port": "2222",
"source_root": "/srv",
"includes": "/srv/www\n/srv/db",
"excludes_add": "*.tmp",
"excludes_replace": "",
"rsync_extra_args": "--numeric-ids",
"retention_daily": "7",
"retention_weekly": "4",
"retention_monthly": "2",
"retention_yearly": "1",
},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=["web-01"]))
self.assertContains(response, "Host config created for web-01.")
host = HostConfig.objects.get(host="web-01")
self.assertEqual(host.address, "web-01.example.test")
self.assertEqual(host.ssh_credential, credential)
self.assertEqual(host.ssh_user, "backup")
self.assertEqual(host.includes, ["/srv/www", "/srv/db"])
self.assertEqual(host.excludes_add, ["*.tmp"])
self.assertEqual(host.rsync_extra_args, ["--numeric-ids"])
self.assertEqual(host.retention_weekly, 4)
def test_create_host_config_uses_global_defaults_and_prepares_directories(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="global-key", private_key="PRIVATE KEY")
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(
name="default",
backup_root=str(backup_root),
default_ssh_credential=credential,
ssh_user="backup",
ssh_port=2222,
default_source_root="/srv",
retention_daily=3,
retention_weekly=2,
retention_monthly=1,
retention_yearly=0,
)
get_response = self.client.get(reverse("create_host_config"))
self.assertContains(get_response, 'value="backup"')
self.assertContains(get_response, 'value="2222"')
self.assertContains(get_response, 'value="/srv"')
response = self.client.post(
reverse("create_host_config"),
{
"host": "web-01",
"address": "web-01.example.test",
"enabled": "on",
"ssh_credential": str(credential.id),
"ssh_user": "backup",
"ssh_port": "2222",
"source_root": "/srv",
"includes": "",
"excludes_add": "",
"excludes_replace": "",
"rsync_extra_args": "",
"retention_daily": "3",
"retention_weekly": "2",
"retention_monthly": "1",
"retention_yearly": "0",
},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=["web-01"]))
self.assertContains(response, "prepared")
self.assertTrue((backup_root / "web-01" / "scheduled").is_dir())
self.assertTrue((backup_root / "web-01" / "manual").is_dir())
self.assertTrue((backup_root / "web-01" / ".incomplete").is_dir())
def test_host_detail_renders_config_schedule_runs_and_snapshots(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp)
GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"])
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
source_root="/srv",
retention_daily=7,
)
for subdir in ("scheduled", "manual", ".incomplete"):
(backup_root / host.host / subdir).mkdir(parents=True)
ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", prune=True, last_status="success")
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "web-01")
self.assertContains(response, "web-01.example.test")
self.assertContains(response, "15 2 * * *")
self.assertContains(response, "Schedule expression")
self.assertContains(response, "Evaluated by the pobsync scheduler service.")
self.assertContains(response, "Next run:")
self.assertContains(response, "UTC")
self.assertContains(response, "20260519-021500Z__ABCDEFGH")
self.assertContains(response, "Discover snapshots")
self.assertContains(response, "Edit schedule")
self.assertContains(response, "Edit config")
self.assertContains(response, "Run connection preflight")
self.assertContains(response, "Backup Control")
self.assertContains(response, "Queue dry-run")
self.assertContains(response, "Queue backup")
self.assertContains(response, "Host Check")
self.assertContains(response, reverse("prepare_host_directories", args=[host.host]))
self.assertContains(response, "warning")
self.assertContains(response, "Snapshot Discovery")
self.assertContains(response, reverse("queue_manual_backup", args=[host.host]))
self.assertContains(response, reverse("run_detail", args=[BackupRun.objects.get().id]))
self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id]))
def test_host_detail_renders_effective_config_preview(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/id_ed25519")
GlobalConfig.objects.create(
name="default",
backup_root="/backups",
default_ssh_credential=credential,
ssh_user="root",
ssh_port=2222,
ssh_options=["-oBatchMode=yes"],
rsync_args=["--archive", "--numeric-ids"],
rsync_extra_args=["--delete"],
rsync_timeout_seconds=300,
rsync_bwlimit_kbps=2048,
default_source_root="/",
excludes_default=["/proc/***", "/sys/***"],
retention_daily=14,
retention_weekly=4,
retention_monthly=2,
retention_yearly=1,
)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
includes=["/srv/www/***"],
excludes_add=["/srv/www/cache/***"],
rsync_extra_args=["--one-file-system"],
)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Effective Config")
self.assertContains(response, "root@web-01.example.test:2222")
self.assertContains(response, "default-key")
self.assertContains(response, "-oBatchMode=yes")
self.assertContains(response, "--archive --numeric-ids --delete --one-file-system")
self.assertContains(response, "/srv/www/***")
self.assertContains(response, "/srv/www/cache/***")
self.assertContains(response, "d14")
self.assertContains(response, "w8")
def test_run_host_preflight_stores_remote_check_result(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with patch(
"pobsync_backend.preflight.subprocess.run",
return_value=subprocess.CompletedProcess(args=["ssh"], returncode=0, stdout="", stderr=""),
) as run:
response = self.client.post(reverse("run_host_preflight", args=[host.host]), follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Connection preflight passed for web-01.")
self.assertContains(response, "Connection Preflight")
self.assertContains(response, "SSH reachability")
self.assertContains(response, "Remote rsync")
self.assertContains(response, "Remote source root")
self.assertEqual(run.call_count, 3)
host.refresh_from_db()
self.assertTrue(host.config["last_preflight"]["ok"])
self.assertEqual(host.config["last_preflight"]["target"], "root@web-01.example.test")
def test_queue_manual_backup_blocks_real_backup_after_failed_remote_preflight(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp)
GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"])
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
config={
"last_preflight": {
"ok": False,
"target": "root@web-01.example.test",
"source_root": "/",
"rsync_binary": "rsync",
"checks": [
{
"name": "Remote rsync",
"ok": False,
"exit_code": 127,
"message": "Remote rsync failed.",
"detail": "rsync missing",
}
],
}
},
)
for subdir in ("scheduled", "manual", ".incomplete"):
(backup_root / host.host / subdir).mkdir(parents=True)
response = self.client.post(
reverse("queue_manual_backup", args=[host.host]),
{"prune_max_delete": "10"},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Cannot queue real backup until failed preflight checks are resolved")
self.assertContains(response, "Remote preflight")
self.assertFalse(BackupRun.objects.exists())
def test_host_detail_renders_backup_trends(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups")
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
BackupRun.objects.create(
host=host,
run_type=BackupRun.RunType.MANUAL,
status=BackupRun.Status.SUCCESS,
snapshot=snapshot,
started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc),
result={
"ok": True,
"dry_run": False,
"stats": {
"duration_seconds": 45,
"rsync": {
"files_total": 250,
"literal_data_bytes": 2048,
"matched_data_bytes": 8192,
},
},
},
)
BackupRun.objects.create(
host=host,
status=BackupRun.Status.SUCCESS,
snapshot=snapshot,
started_at=datetime(2026, 5, 18, 2, 15, tzinfo=timezone.utc),
result={
"ok": True,
"dry_run": False,
"stats": {
"duration_seconds": 35,
"rsync": {
"files_total": 150,
"literal_data_bytes": 1024,
"matched_data_bytes": 4096,
},
},
},
)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Backup Trends")
self.assertContains(response, "Avg New Data")
self.assertContains(response, "Avg Daily New")
self.assertContains(response, "manual")
self.assertContains(response, "45s")
self.assertContains(response, "250")
self.assertContains(response, "2.0")
self.assertContains(response, "KB")
self.assertContains(response, "Run data trend")
self.assertContains(response, "width: 100%")
self.assertContains(response, "width: 50%")
def test_prepare_host_directories_action_creates_missing_directories(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(reverse("prepare_host_directories", args=[host.host]), follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Prepared backup directories")
self.assertTrue((backup_root / host.host / "scheduled").is_dir())
self.assertTrue((backup_root / host.host / "manual").is_dir())
self.assertTrue((backup_root / host.host / ".incomplete").is_dir())
def test_host_detail_warns_when_rsync_is_not_recursive(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=[])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Host effective rsync recursion")
self.assertContains(response, "Rsync args do not include archive or recursive transfer.")
def test_host_detail_warns_when_replace_excludes_drops_root_defaults(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(
name="default",
backup_root="/backups",
rsync_args=["--archive"],
excludes_default=["/proc/***", "/sys/***", "/dev/***", "/run/***", "/tmp/***"],
)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
excludes_replace=[],
)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Effective root excludes")
self.assertContains(response, "excludes_replace is active")
def test_host_detail_warns_that_includes_are_raw_rsync_rules(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"])
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
includes=["/srv/www"],
)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Host includes")
self.assertContains(response, "Includes are passed to rsync as raw --include rules.")
def test_scan_host_known_key_action_updates_selected_credential(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity")
GlobalConfig.objects.create(name="default", backup_root="/backups", default_ssh_credential=credential, ssh_port=2222)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with patch(
"pobsync_backend.views.scan_known_host",
return_value="web-01.example.test ssh-ed25519 AAAASCANNED",
) as scan:
response = self.client.post(reverse("scan_host_known_key", args=[host.host]), follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Stored SSH host key for web-01")
scan.assert_called_once_with("web-01.example.test", port=2222)
credential.refresh_from_db()
self.assertEqual(credential.known_hosts, "web-01.example.test ssh-ed25519 AAAASCANNED\n")
def test_host_detail_surfaces_active_backup_run(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups")
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "queued")
self.assertContains(response, f"Run {run.id}")
self.assertContains(response, reverse("run_detail", args=[run.id]))
def test_host_detail_disables_backup_controls_without_global_config(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "missing global config")
self.assertContains(response, "Create the default global config before queueing backups.")
def test_host_detail_renders_discovery_status_for_existing_snapshot_dirs(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
(backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH").mkdir(parents=True)
(backup_root / host.host / ".incomplete" / "20260519-031500Z__BROKEN01").mkdir(parents=True)
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, f"Host root:</strong> {backup_root / host.host}")
self.assertContains(response, "Found 2 snapshot directories")
self.assertContains(response, "scheduled 1")
self.assertContains(response, "incomplete 1")
def test_host_detail_returns_404_for_unknown_host(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.get(reverse("host_detail", args=["missing-host"]))
self.assertEqual(response.status_code, 404)
def test_queue_manual_backup_creates_queued_run_and_redirects_to_run_detail(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(
reverse("queue_manual_backup", args=[host.host]),
{
"dry_run": "on",
"verbose_output": "on",
"prune": "on",
"prune_max_delete": "4",
"prune_protect_bases": "on",
},
follow=True,
)
run = BackupRun.objects.get(host=host)
self.assertRedirects(response, reverse("run_detail", args=[run.id]))
self.assertContains(response, f"Queued manual backup run {run.id} for web-01.")
self.assertEqual(run.status, BackupRun.Status.QUEUED)
self.assertEqual(run.run_type, BackupRun.RunType.MANUAL)
self.assertEqual(
run.result["requested"],
{
"dry_run": True,
"verbose_output": True,
"prune": True,
"prune_max_delete": 4,
"prune_protect_bases": True,
},
)
def test_queue_manual_backup_quick_action_can_queue_real_backup(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp)
GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
for subdir in ("scheduled", "manual", ".incomplete"):
(backup_root / host.host / subdir).mkdir(parents=True)
response = self.client.post(
reverse("queue_manual_backup", args=[host.host]),
{"prune_max_delete": "10"},
follow=True,
)
run = BackupRun.objects.get(host=host)
self.assertRedirects(response, reverse("run_detail", args=[run.id]))
self.assertEqual(
run.result["requested"],
{
"dry_run": False,
"verbose_output": False,
"prune": False,
"prune_max_delete": 10,
"prune_protect_bases": False,
},
)
def test_queue_manual_backup_blocks_real_backup_when_host_directories_are_missing(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp)
GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(
reverse("queue_manual_backup", args=[host.host]),
{"prune_max_delete": "10"},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Cannot queue real backup until failed preflight checks are resolved")
self.assertContains(response, "Host backup root")
self.assertFalse(BackupRun.objects.exists())
def test_queue_manual_backup_allows_dry_run_with_only_storage_preflight_failures(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp)
GlobalConfig.objects.create(name="default", backup_root=str(backup_root), rsync_args=["--archive"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(
reverse("queue_manual_backup", args=[host.host]),
{"dry_run": "on", "verbose_output": "on", "prune_max_delete": "10"},
follow=True,
)
run = BackupRun.objects.get(host=host)
self.assertRedirects(response, reverse("run_detail", args=[run.id]))
self.assertEqual(run.result["requested"]["dry_run"], True)
def test_queue_manual_backup_requires_default_global_config(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Create the default global config before queueing backups.")
self.assertFalse(BackupRun.objects.exists())
def test_queue_manual_backup_rejects_disabled_host(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--archive"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test", enabled=False)
response = self.client.post(reverse("queue_manual_backup", args=[host.host]), {"dry_run": "on"}, follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Cannot queue backup for disabled host web-01.")
self.assertFalse(BackupRun.objects.exists())
def test_queue_manual_backup_requires_post(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("queue_manual_backup", args=[host.host]))
self.assertEqual(response.status_code, 405)
def test_run_detail_renders_result_payload(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.SUCCESS,
snapshot=snapshot,
snapshot_path=snapshot.path,
base_path="/backups/web-01/scheduled/base",
rsync_exit_code=0,
result={
"ok": True,
"snapshot": snapshot.path,
"rsync": {
"command": ["rsync", "--archive", "root@web-01:/", snapshot.path],
"exit_code": 0,
"log_tail": ["sending incremental file list", "sent 500 bytes"],
},
"requested": {
"dry_run": True,
"verbose_output": True,
"prune": False,
"prune_max_delete": 10,
"prune_protect_bases": False,
},
"stats": {
"duration_seconds": 12,
"rsync": {
"files_total": 10,
"files_transferred": 2,
"total_file_size_bytes": 2000,
"total_transferred_file_size_bytes": 500,
"literal_data_bytes": 500,
"matched_data_bytes": 1500,
"link_dest_estimated_savings_bytes": 1500,
},
},
},
)
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Run")
self.assertContains(response, "web-01")
self.assertContains(response, "success")
self.assertContains(response, "ABCDEFGH")
self.assertContains(response, "Requested Options")
self.assertContains(response, "Dry run:</strong> yes")
self.assertContains(response, "Verbose rsync output:</strong> yes")
self.assertContains(response, "Rsync Command")
self.assertContains(response, "--archive")
self.assertContains(response, "Rsync Log")
self.assertContains(response, "sending incremental file list")
self.assertContains(response, "Dry Run Summary")
self.assertContains(response, "Files Seen")
self.assertContains(response, "Would Transfer")
self.assertContains(response, "Transfer Estimate")
self.assertContains(response, "Warnings:</strong> none recorded")
self.assertContains(response, "Stats")
self.assertContains(response, "Files seen:</strong> 10")
self.assertContains(response, "Estimated link-dest saving")
self.assertContains(response, "&quot;ok&quot;: true")
self.assertContains(response, reverse("snapshot_detail", args=[snapshot.id]))
def test_run_detail_surfaces_dry_run_warnings_and_log_link(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with TemporaryDirectory() as tmp:
log_path = Path(tmp) / "dry-run" / "rsync.log"
log_path.parent.mkdir(parents=True)
log_path.write_text("WARNING: noisy shell output\npermission denied\n", encoding="utf-8")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.FAILED,
rsync_exit_code=255,
result={
"ok": False,
"dry_run": True,
"log": str(log_path),
"failure": {
"category": "transport",
"message": "Rsync transport failed.",
"hint": "Check SSH access.",
},
"stats": {
"duration_seconds": 4,
"rsync": {
"files_total": 25,
"files_transferred": 3,
"total_file_size_bytes": 10_000,
"total_transferred_file_size_bytes": 1_500,
},
},
"rsync": {
"exit_code": 255,
"log_tail": ["WARNING: noisy shell output", "permission denied"],
},
},
)
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Dry Run Summary")
self.assertContains(response, "failed")
self.assertContains(response, "Files Seen")
self.assertContains(response, "25")
self.assertContains(response, "Would Transfer")
self.assertContains(response, "3")
self.assertContains(response, "1.5")
self.assertContains(response, "Open full rsync log")
self.assertContains(response, reverse("run_rsync_log", args=[run.id]))
self.assertContains(response, "Rsync transport failed.")
self.assertContains(response, "Check SSH access.")
self.assertContains(response, "WARNING: noisy shell output")
def test_run_detail_links_existing_rsync_log(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with TemporaryDirectory() as tmp:
log_path = Path(tmp) / "snapshot" / "meta" / "rsync.log"
log_path.parent.mkdir(parents=True)
log_path.write_text("old line\nrsync log line\n", encoding="utf-8")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.SUCCESS,
snapshot_path=str(log_path.parent.parent),
result={"ok": True, "log": str(log_path)},
)
response = self.client.get(reverse("run_detail", args=[run.id]))
log_response = self.client.get(reverse("run_rsync_log", args=[run.id]))
log_body = b"".join(log_response.streaming_content)
self.assertContains(response, reverse("run_rsync_log", args=[run.id]))
self.assertContains(response, str(log_path))
self.assertContains(response, "rsync log line")
self.assertEqual(log_response.status_code, 200)
self.assertEqual(log_body, b"old line\nrsync log line\n")
def test_run_detail_surfaces_failure_and_retention_warning(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.WARNING,
rsync_exit_code=0,
result={
"ok": True,
"failure": {
"category": "transport",
"summary": "SSH connection dropped.",
"hint": "Check network connectivity.",
},
"prune": {
"ok": True,
"source": "sql",
"kind": "scheduled",
"planned_delete_count": 1,
"max_delete": 1,
"protect_bases": True,
"incomplete_ignored_count": 1,
"deleted": [{"dirname": "20260518-021500Z__OLD"}],
"actions": ["deleted scheduled 20260518-021500Z__OLD"],
},
},
)
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Failure")
self.assertContains(response, "transport")
self.assertContains(response, "Check network connectivity.")
self.assertContains(response, "Retention")
self.assertContains(response, "Planned deletions")
self.assertContains(response, "Max delete")
self.assertContains(response, "Protect bases")
self.assertContains(response, "Incomplete ignored")
self.assertContains(response, "deleted scheduled 20260518-021500Z__OLD")
def test_run_detail_surfaces_host_retention_warnings(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1)
self._snapshot(host, "20260517-021500Z__OLDSNP1")
self._snapshot(host, "20260518-021500Z__OLDSNP2")
self._snapshot(host, "20260519-021500Z__NEWSNAP")
SnapshotRecord.objects.create(
host=host,
kind=SnapshotRecord.Kind.INCOMPLETE,
dirname="20260519-031500Z__BROKEN01",
path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01",
status="failed",
started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc),
)
run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, result={"ok": True})
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Retention Warnings")
self.assertContains(response, "Scheduled pruning for this host would delete 2 snapshot(s)")
self.assertContains(response, "1 incomplete snapshot(s) exist for this host")
def test_run_detail_infers_rsync_log_from_snapshot_path(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with TemporaryDirectory() as tmp:
snapshot_path = Path(tmp) / "snapshot"
log_path = snapshot_path / "meta" / "rsync.log"
log_path.parent.mkdir(parents=True)
log_path.write_text("scheduled log\n", encoding="utf-8")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.SUCCESS,
snapshot_path=str(snapshot_path),
result={"ok": True},
)
response = self.client.get(reverse("run_rsync_log", args=[run.id]))
body = b"".join(response.streaming_content)
self.assertEqual(response.status_code, 200)
self.assertEqual(body, b"scheduled log\n")
def test_run_detail_offers_cancel_for_running_run(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING)
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Cancel run")
self.assertContains(response, reverse("cancel_run", args=[run.id]))
def test_cancel_run_marks_queued_run_cancelled(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = BackupRun.objects.create(host=host, status=BackupRun.Status.QUEUED)
response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True)
self.assertRedirects(response, reverse("run_detail", args=[run.id]))
self.assertContains(response, "Cancellation requested")
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.CANCELLED)
self.assertIsNotNone(run.ended_at)
self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.QUEUED)
def test_cancel_run_requests_running_run_cancellation(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = BackupRun.objects.create(host=host, status=BackupRun.Status.RUNNING)
response = self.client.post(reverse("cancel_run", args=[run.id]), follow=True)
self.assertRedirects(response, reverse("run_detail", args=[run.id]))
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.CANCELLED)
self.assertIsNone(run.ended_at)
self.assertEqual(run.result["cancellation"]["previous_status"], BackupRun.Status.RUNNING)
def test_snapshot_detail_renders_metadata_runs_and_children(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
base = self._snapshot(host, "20260518-021500Z__BASESNAP")
base.metadata = {
"status": "success",
"snapshot_id": "BASESNAP",
"stats": {
"duration_seconds": 20,
"rsync": {
"files_total": 100,
"files_transferred": 4,
"total_file_size_bytes": 10_000,
"link_dest_estimated_savings_bytes": 7_000,
},
"storage": {
"snapshot": {
"apparent_size_bytes": 10_000,
"allocated_size_bytes": 3_000,
"hardlinked_files": 9,
},
"capacity": {
"used_percent": 30.5,
"available_bytes": 1_000_000,
},
},
},
}
base.save(update_fields=["metadata"])
child = self._snapshot(host, "20260519-021500Z__CHILDSNP")
child.base = base
child.base_dirname = base.dirname
child.save(update_fields=["base", "base_dirname"])
run = BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=base)
response = self.client.get(reverse("snapshot_detail", args=[base.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, base.dirname)
self.assertContains(response, "BASESNAP")
self.assertContains(response, "Stats")
self.assertContains(response, "Files seen:</strong> 100")
self.assertContains(response, "Hardlinked files:</strong> 9")
self.assertContains(response, "Restore Guidance")
self.assertContains(response, f"{base.path}/data")
self.assertContains(response, f"/restore/{host.host}")
self.assertContains(response, "rsync -aHAX --numeric-ids --info=progress2 --dry-run")
self.assertContains(response, f"{base.path}/data/")
self.assertContains(response, "root@web-01.example.test:/")
self.assertContains(response, "Dry-run a directory restore")
self.assertContains(response, f"{base.path}/data/etc/nginx/")
self.assertContains(response, f"/restore/{host.host}/etc/nginx/")
self.assertContains(response, "Dry-run a single file restore")
self.assertContains(response, f"{base.path}/data/home/example/site/public_html/index.php")
self.assertContains(response, f"/restore/{host.host}/home/example/site/public_html/index.php")
self.assertContains(response, "Treat snapshot directories as read-only")
self.assertContains(response, child.dirname)
self.assertContains(response, f"Run {run.id}")
self.assertContains(response, reverse("run_detail", args=[run.id]))
self.assertContains(response, reverse("snapshot_detail", args=[child.id]))
def test_discover_host_snapshots_action_discovers_and_redirects(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Snapshot discovery scanned 1 items")
self.assertTrue(SnapshotRecord.objects.filter(host=host, dirname=snapshot_dir.name).exists())
def test_discover_host_snapshots_warns_when_host_root_is_missing(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(reverse("discover_host_snapshots", args=[host.host]), follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Snapshot discovery scanned 0 items")
self.assertContains(response, "Host backup directory does not exist yet")
self.assertFalse(SnapshotRecord.objects.exists())
def test_discover_host_snapshots_requires_post(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("discover_host_snapshots", args=[host.host]))
self.assertEqual(response.status_code, 405)
def test_retention_plan_renders_keep_and_delete_sets(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
old_snapshot = self._snapshot(host, "20260518-021500Z__OLDSNAP")
new_snapshot = self._snapshot(host, "20260519-021500Z__NEWSNAP")
response = self.client.get(reverse("host_retention_plan", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Retention Plan: web-01")
self.assertContains(response, old_snapshot.dirname)
self.assertContains(response, new_snapshot.dirname)
self.assertContains(response, "newest")
self.assertContains(response, "Would Delete")
self.assertContains(response, "outside retention policy")
self.assertContains(response, "Confirm delete count")
self.assertContains(response, "Type 1 to confirm the current number of planned deletions.")
def test_retention_plan_warns_when_scheduled_prune_limit_is_exceeded(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1)
self._snapshot(host, "20260517-021500Z__OLDSNP1")
self._snapshot(host, "20260518-021500Z__OLDSNP2")
self._snapshot(host, "20260519-021500Z__NEWSNAP")
response = self.client.get(reverse("host_retention_plan", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Scheduled Prune Limit")
self.assertContains(response, "would delete 2 snapshot(s)")
self.assertContains(response, "scheduled prune limit of")
self.assertContains(response, "Schedule max delete:</strong> 1")
def test_retention_plan_can_enable_base_protection(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
base = self._snapshot(host, "20260518-021500Z__BASESNAP")
child = self._snapshot(host, "20260519-021500Z__CHILDSNP")
child.base = base
child.save(update_fields=["base"])
response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"protect_bases": "1"})
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Protect bases:</strong> yes")
self.assertContains(response, "Base snapshots referenced by kept snapshots")
self.assertContains(response, f"base-of:{child.dirname}")
def test_retention_plan_surfaces_incomplete_snapshots_without_deleting_them(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
self._snapshot(host, "20260518-021500Z__OLDSNAP")
self._snapshot(host, "20260519-021500Z__NEWSNAP")
SnapshotRecord.objects.create(
host=host,
kind=SnapshotRecord.Kind.INCOMPLETE,
dirname="20260519-031500Z__BROKEN01",
path=f"/backups/{host.host}/.incomplete/20260519-031500Z__BROKEN01",
status="failed",
started_at=datetime(2026, 5, 19, 3, 15, tzinfo=timezone.utc),
)
response = self.client.get(reverse("host_retention_plan", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Incomplete Snapshots")
self.assertContains(response, "20260519-031500Z__BROKEN01")
self.assertContains(response, "excluded from retention cleanup")
def test_host_detail_surfaces_retention_warnings(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True, prune_max_delete=1)
self._snapshot(host, "20260517-021500Z__OLDSNP1")
self._snapshot(host, "20260518-021500Z__OLDSNP2")
self._snapshot(host, "20260519-021500Z__NEWSNAP")
response = self.client.get(reverse("host_detail", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Retention Warnings")
self.assertContains(response, "Scheduled pruning would delete 2 snapshot(s), above max delete")
def test_retention_plan_rejects_invalid_kind(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("host_retention_plan", args=[host.host]), {"kind": "weird"}, follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Retention kind must be scheduled, manual, or all.")
def test_retention_apply_deletes_planned_snapshot_after_confirmation(self) -> None:
self.client.force_login(self.staff_user)
with TemporaryDirectory() as tmp:
home = Path(tmp) / "home"
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
old_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260518-021500Z__OLDSNAP"
new_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260519-021500Z__NEWSNAP"
old_dir.mkdir(parents=True)
new_dir.mkdir(parents=True)
old_snapshot = self._snapshot(host, old_dir.name)
old_snapshot.path = str(old_dir)
old_snapshot.save(update_fields=["path"])
new_snapshot = self._snapshot(host, new_dir.name)
new_snapshot.path = str(new_dir)
new_snapshot.save(update_fields=["path"])
with override_settings(POBSYNC_HOME=str(home)):
response = self.client.post(
reverse("apply_host_retention", args=[host.host]),
{
"kind": "scheduled",
"max_delete": "1",
"confirm_host": host.host,
"confirm_delete_count": "1",
},
follow=True,
)
self.assertFalse(old_dir.exists())
self.assertTrue(new_dir.exists())
self.assertRedirects(response, f"{reverse('host_retention_plan', args=[host.host])}?kind=scheduled")
self.assertContains(response, "Retention deleted 1 snapshot(s) for web-01.")
self.assertFalse(SnapshotRecord.objects.filter(pk=old_snapshot.pk).exists())
self.assertTrue(SnapshotRecord.objects.filter(pk=new_snapshot.pk).exists())
def test_retention_apply_rejects_bad_confirmation(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
self._snapshot(host, "20260518-021500Z__OLDSNAP")
response = self.client.post(
reverse("apply_host_retention", args=[host.host]),
{
"kind": "scheduled",
"max_delete": "1",
"confirm_host": "wrong",
"confirm_delete_count": "1",
},
follow=True,
)
self.assertRedirects(response, reverse("host_retention_plan", args=[host.host]))
self.assertContains(response, "Retention apply confirmation is invalid.")
self.assertEqual(SnapshotRecord.objects.count(), 1)
def test_retention_apply_rejects_mismatched_delete_count_confirmation(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
self._snapshot(host, "20260518-021500Z__OLDSNAP")
self._snapshot(host, "20260519-021500Z__NEWSNAP")
response = self.client.post(
reverse("apply_host_retention", args=[host.host]),
{
"kind": "scheduled",
"max_delete": "1",
"confirm_host": host.host,
"confirm_delete_count": "0",
},
follow=True,
)
self.assertRedirects(response, reverse("host_retention_plan", args=[host.host]))
self.assertContains(response, "Retention apply confirmation is invalid.")
self.assertEqual(SnapshotRecord.objects.count(), 2)
def test_retention_apply_requires_post(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("apply_host_retention", args=[host.host]))
self.assertEqual(response.status_code, 405)
def test_schedule_form_renders_defaults_for_new_schedule(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("edit_host_schedule", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Create Schedule")
self.assertContains(response, "Schedule expression")
self.assertContains(response, "evaluated by the pobsync scheduler service")
self.assertContains(response, "15 2 * * *")
self.assertContains(response, "Save schedule")
def test_schedule_form_creates_schedule(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(
reverse("edit_host_schedule", args=[host.host]),
{
"cron_expr": "30 3 * * *",
"enabled": "on",
"prune": "on",
"prune_max_delete": "4",
"prune_protect_bases": "on",
},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Schedule saved for web-01.")
schedule = ScheduleConfig.objects.get(host=host)
self.assertEqual(schedule.cron_expr, "30 3 * * *")
self.assertTrue(schedule.enabled)
self.assertTrue(schedule.prune)
self.assertEqual(schedule.prune_max_delete, 4)
self.assertTrue(schedule.prune_protect_bases)
def test_schedule_form_updates_existing_schedule(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
schedule = ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True)
response = self.client.post(
reverse("edit_host_schedule", args=[host.host]),
{
"cron_expr": "45 4 * * 1",
"prune_max_delete": "8",
},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
schedule.refresh_from_db()
self.assertEqual(schedule.cron_expr, "45 4 * * 1")
self.assertFalse(schedule.enabled)
self.assertFalse(schedule.prune)
self.assertEqual(schedule.prune_max_delete, 8)
def test_schedule_form_renders_existing_schedule_values(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
ScheduleConfig.objects.create(
host=host,
cron_expr="45 4 * * 1",
enabled=True,
prune_max_delete=8,
)
response = self.client.get(reverse("edit_host_schedule", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Edit Schedule")
self.assertContains(response, 'value="45 4 * * 1"', html=False)
self.assertContains(response, 'value="8"', html=False)
self.assertNotContains(response, 'value="15 2 * * *"', html=False)
self.assertNotContains(response, ">User<", html=False)
def test_schedule_form_rejects_invalid_cron(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(
reverse("edit_host_schedule", args=[host.host]),
{
"cron_expr": "bad cron",
"enabled": "on",
"prune_max_delete": "10",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "cron expression must have exactly 5 fields")
self.assertFalse(ScheduleConfig.objects.filter(host=host).exists())
def test_host_config_form_renders_existing_values(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
includes=["/srv"],
excludes_add=["*.tmp"],
rsync_extra_args=["--numeric-ids"],
)
response = self.client.get(reverse("edit_host_config", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Config: web-01")
self.assertContains(response, "web-01.example.test")
self.assertContains(response, "/srv")
self.assertContains(response, "*.tmp")
self.assertContains(response, "--numeric-ids")
def test_host_config_form_renders_effective_config_check(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups", rsync_args=["--numeric-ids"])
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.get(reverse("edit_host_config", args=[host.host]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Effective Config Check")
self.assertContains(response, "Host effective rsync recursion")
self.assertContains(response, "Rsync args do not include archive or recursive transfer.")
def test_host_config_form_updates_host_config(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="old.example.test")
response = self.client.post(
reverse("edit_host_config", args=[host.host]),
{
"address": "new.example.test",
"enabled": "on",
"ssh_user": "backup",
"ssh_port": "2222",
"source_root": "/srv",
"includes": "/srv/www\n/srv/db",
"excludes_add": "*.tmp\ncache/",
"excludes_replace": "",
"rsync_extra_args": "--numeric-ids\n--delete",
"retention_daily": "7",
"retention_weekly": "4",
"retention_monthly": "2",
"retention_yearly": "1",
},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Host config saved for web-01.")
host.refresh_from_db()
self.assertEqual(host.address, "new.example.test")
self.assertEqual(host.ssh_user, "backup")
self.assertEqual(host.ssh_port, 2222)
self.assertEqual(host.source_root, "/srv")
self.assertEqual(host.includes, ["/srv/www", "/srv/db"])
self.assertEqual(host.excludes_add, ["*.tmp", "cache/"])
self.assertIsNone(host.excludes_replace)
self.assertEqual(host.rsync_extra_args, ["--numeric-ids", "--delete"])
self.assertEqual(host.retention_daily, 7)
self.assertEqual(host.retention_yearly, 1)
def test_host_config_form_can_replace_global_excludes(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
response = self.client.post(
reverse("edit_host_config", args=[host.host]),
{
"address": host.address,
"ssh_user": "",
"ssh_port": "",
"source_root": "",
"includes": "",
"excludes_add": "",
"excludes_replace": "*.cache\nnode_modules/",
"rsync_extra_args": "",
"retention_daily": "14",
"retention_weekly": "8",
"retention_monthly": "12",
"retention_yearly": "0",
},
follow=True,
)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
host.refresh_from_db()
self.assertFalse(host.enabled)
self.assertEqual(host.excludes_add, [])
self.assertEqual(host.excludes_replace, ["*.cache", "node_modules/"])
def _snapshot(self, host: HostConfig, dirname: str) -> SnapshotRecord:
started_at = datetime.strptime(dirname.split("__", 1)[0], "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
return SnapshotRecord.objects.create(
host=host,
kind=SnapshotRecord.Kind.SCHEDULED,
dirname=dirname,
path=f"/backups/{host.host}/scheduled/{dirname}",
status="success",
started_at=started_at,
)