Run post-backup pruning through SQL retention

Stop passing prune options into the legacy scheduled backup engine from the
Django backup command. Record the completed snapshot first, then apply retention
through the SQL-backed retention service so pruning sees the same SnapshotRecord
state as the admin and retention command.

Also record prune failures on BackupRun.result instead of leaving the run in an
ambiguous state.
This commit is contained in:
2026-05-19 11:32:32 +02:00
parent 254f915051
commit 797619acd9
3 changed files with 128 additions and 4 deletions

View File

@@ -9,6 +9,7 @@ from django.core.management import call_command
from django.core.management.base import CommandError
from django.test import TestCase
from pobsync.errors import ConfigError
from pobsync.util import write_yaml_atomic
from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord
@@ -51,6 +52,99 @@ class RunBackupRecordsSnapshotTests(TestCase):
self.assertEqual(record.kind, "scheduled")
self.assertEqual(record.status, "success")
def test_prune_uses_sql_retention_after_snapshot_record_is_created(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
with (
patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled,
patch(
"pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply"
) as retention_apply,
):
run_scheduled.return_value = {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": str(snapshot_dir),
"base": None,
"rsync": {"exit_code": 0},
}
retention_apply.return_value = {"ok": True, "source": "sql", "deleted": []}
call_command(
"run_pobsync_backup",
host.host,
prefix=str(Path(tmp) / "home"),
prune=True,
prune_max_delete=3,
prune_protect_bases=True,
stdout=StringIO(),
)
run_scheduled.assert_called_once()
self.assertFalse(run_scheduled.call_args.kwargs["prune"])
retention_apply.assert_called_once_with(
prefix=Path(tmp) / "home",
host=host.host,
kind="scheduled",
protect_bases=True,
yes=True,
max_delete=3,
acquire_lock=False,
)
run = BackupRun.objects.get()
self.assertEqual(run.status, BackupRun.Status.SUCCESS)
self.assertEqual(run.result["prune"], {"ok": True, "source": "sql", "deleted": []})
def test_prune_failure_is_recorded_on_backup_run(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
with (
patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled,
patch(
"pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply"
) as retention_apply,
):
run_scheduled.return_value = {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": str(snapshot_dir),
"base": None,
"rsync": {"exit_code": 0},
}
retention_apply.side_effect = ConfigError("Deletion blocked by --max-delete=0")
with self.assertRaises(ConfigError):
call_command(
"run_pobsync_backup",
host.host,
prefix=str(Path(tmp) / "home"),
prune=True,
prune_max_delete=0,
stdout=StringIO(),
)
run = BackupRun.objects.get()
self.assertEqual(run.status, BackupRun.Status.FAILED)
self.assertIsNotNone(run.snapshot)
self.assertEqual(run.result["prune"]["ok"], False)
self.assertEqual(run.result["prune"]["type"], "ConfigError")
self.assertEqual(run.result["prune"]["error"], "Deletion blocked by --max-delete=0")
def test_failed_backup_upserts_incomplete_snapshot_record(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"