2 Commits

Author SHA1 Message Date
797619acd9 Run post-backup pruning through SQL retention
Stop passing prune options into the legacy scheduled backup engine from the
Django backup command. Record the completed snapshot first, then apply retention
through the SQL-backed retention service so pruning sees the same SnapshotRecord
state as the admin and retention command.

Also record prune failures on BackupRun.result instead of leaving the run in an
ambiguous state.
2026-05-19 11:32:32 +02:00
254f915051 Plan Django retention from snapshot records 2026-05-19 11:24:48 +02:00
8 changed files with 532 additions and 87 deletions

View File

@@ -140,6 +140,8 @@ SQLite remains the default because it is enough for a single backup server and k
The public command surface is Django-first. The old YAML/cron CLI has been retired from the `pobsync` entrypoint. The public command surface is Django-first. The old YAML/cron CLI has been retired from the `pobsync` entrypoint.
Discovered snapshots are stored in `SnapshotRecord`, including the base snapshot metadata and a nullable SQL link to the Discovered snapshots are stored in `SnapshotRecord`, including the base snapshot metadata and a nullable SQL link to the
base record when it is known. base record when it is known.
The Django retention command plans from `SnapshotRecord` instead of rediscovering snapshots from the filesystem.
Post-backup pruning from Django also uses the SQL retention service after the completed snapshot is recorded.
The remaining internal engine code still contains reusable backup primitives: The remaining internal engine code still contains reusable backup primitives:

View File

@@ -2,12 +2,12 @@ from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple from typing import Any, List
from ..config.source import ConfigSource, FileConfigSource from ..config.source import ConfigSource, FileConfigSource
from ..errors import ConfigError from ..errors import ConfigError
from ..paths import PobsyncPaths from ..paths import PobsyncPaths
from ..retention import Snapshot, build_retention_plan from ..retention import Snapshot, apply_base_protection, build_retention_plan
from ..snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root from ..snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root
from ..util import sanitize_host from ..util import sanitize_host
@@ -28,59 +28,6 @@ def _parse_snapshot_dt(dirname: str, meta: dict) -> datetime:
return datetime.fromtimestamp(0, tz=timezone.utc) return datetime.fromtimestamp(0, tz=timezone.utc)
def _apply_base_protection(
snapshots: List[Snapshot],
keep: set[str],
reasons: Dict[str, List[str]],
) -> Tuple[set[str], Dict[str, List[str]]]:
"""
Optional policy: if a kept snapshot has a base (kind+dirname), also keep that base snapshot.
This is NOT required for hardlink snapshots to remain readable, but can be useful
for performance (better base selection) or "chain" readability.
Adds reason: "base-of:<child_dirname>"
"""
# Index snapshots by (kind, dirname)
idx: Dict[Tuple[str, str], Snapshot] = {(s.kind, s.dirname): s for s in snapshots}
changed = True
while changed:
changed = False
# Iterate over a stable list of current keep items
for child_dirname in list(keep):
# Find the child snapshot (may exist in multiple kinds; check both)
child: Optional[Snapshot] = None
for k in ("scheduled", "manual"):
child = idx.get((k, child_dirname))
if child is not None:
break
if child is None:
continue
base = child.base
if not isinstance(base, dict):
continue
base_kind = base.get("kind")
base_dirname = base.get("dirname")
if not isinstance(base_kind, str) or not isinstance(base_dirname, str):
continue
base_snap = idx.get((base_kind, base_dirname))
if base_snap is None:
# Base might have been pruned already or never existed; ignore.
continue
if base_dirname not in keep:
keep.add(base_dirname)
reasons.setdefault(base_dirname, []).append(f"base-of:{child_dirname}")
changed = True
return keep, reasons
def run_retention_plan( def run_retention_plan(
prefix: Path, prefix: Path,
host: str, host: str,
@@ -142,7 +89,7 @@ def run_retention_plan(
reasons = dict(plan.reasons) reasons = dict(plan.reasons)
if protect_bases: if protect_bases:
keep, reasons = _apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons) keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [s for s in snapshots if s.dirname not in keep] delete = [s for s in snapshots if s.dirname not in keep]

View File

@@ -123,3 +123,50 @@ def build_retention_plan(
return RetentionResult(keep=keep, reasons=reasons) return RetentionResult(keep=keep, reasons=reasons)
def apply_base_protection(
*,
snapshots: Iterable[Snapshot],
keep: Set[str],
reasons: Dict[str, List[str]],
) -> Tuple[Set[str], Dict[str, List[str]]]:
"""
If a kept snapshot has a base (kind+dirname), also keep that base snapshot.
Hardlink snapshots remain readable without this, but keeping bases can make
future base selection and chain inspection easier.
"""
snapshot_list = list(snapshots)
index: Dict[Tuple[str, str], Snapshot] = {(snapshot.kind, snapshot.dirname): snapshot for snapshot in snapshot_list}
changed = True
while changed:
changed = False
for child_dirname in list(keep):
child = _find_snapshot_by_dirname(snapshot_list, child_dirname)
if child is None or not isinstance(child.base, dict):
continue
base_kind = child.base.get("kind")
base_dirname = child.base.get("dirname")
if not isinstance(base_kind, str) or not isinstance(base_dirname, str):
continue
base_snapshot = index.get((base_kind, base_dirname))
if base_snapshot is None or base_dirname in keep:
continue
keep.add(base_dirname)
reasons.setdefault(base_dirname, []).append(f"base-of:{child_dirname}")
changed = True
return keep, reasons
def _find_snapshot_by_dirname(snapshots: Iterable[Snapshot], dirname: str) -> Snapshot | None:
for kind in ("scheduled", "manual"):
for snapshot in snapshots:
if snapshot.kind == kind and snapshot.dirname == dirname:
return snapshot
return None

View File

@@ -11,6 +11,7 @@ from pobsync.commands.run_scheduled import run_scheduled
from pobsync.paths import PobsyncPaths from pobsync.paths import PobsyncPaths
from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.config_source import DjangoConfigSource
from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.models import BackupRun, HostConfig
from pobsync_backend.retention import run_sql_retention_apply
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
@@ -45,9 +46,7 @@ class Command(BaseCommand):
prefix=paths.home, prefix=paths.home,
host=host.host, host=host.host,
dry_run=bool(options["dry_run"]), dry_run=bool(options["dry_run"]),
prune=bool(options["prune"]), prune=False,
prune_max_delete=int(options["prune_max_delete"]),
prune_protect_bases=bool(options["prune_protect_bases"]),
config_source=DjangoConfigSource(), config_source=DjangoConfigSource(),
) )
except Exception as exc: except Exception as exc:
@@ -72,6 +71,22 @@ class Command(BaseCommand):
snapshot_record, _created = upsert_snapshot_record(host=host, kind=kind, snapshot_dir=snapshot_path) snapshot_record, _created = upsert_snapshot_record(host=host, kind=kind, snapshot_dir=snapshot_path)
except ValueError: except ValueError:
snapshot_record = None snapshot_record = None
if result.get("ok") and not result.get("dry_run") and options["prune"]:
try:
result["prune"] = run_sql_retention_apply(
prefix=paths.home,
host=host.host,
kind="scheduled",
protect_bases=bool(options["prune_protect_bases"]),
yes=True,
max_delete=int(options["prune_max_delete"]),
acquire_lock=False,
)
except Exception as exc:
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.status = BackupRun.Status.FAILED
run.result = result
run.snapshot = snapshot_record run.snapshot = snapshot_record
run.save( run.save(
update_fields=[ update_fields=[
@@ -84,6 +99,21 @@ class Command(BaseCommand):
"result", "result",
], ],
) )
raise
run.snapshot = snapshot_record
run.result = result
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
if result.get("ok"): if result.get("ok"):
self.stdout.write(self.style.SUCCESS(f"Backup completed for {host.host}.")) self.stdout.write(self.style.SUCCESS(f"Backup completed for {host.host}."))

View File

@@ -7,10 +7,8 @@ from typing import Any
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from pobsync.commands.retention_apply import run_retention_apply from pobsync.errors import ConfigError
from pobsync.commands.retention_plan import run_retention_plan from pobsync_backend.retention import run_sql_retention_apply, run_sql_retention_plan
from pobsync_backend.config_source import DjangoConfigSource
from pobsync_backend.models import HostConfig
class Command(BaseCommand): class Command(BaseCommand):
@@ -27,29 +25,25 @@ class Command(BaseCommand):
def handle(self, *args: Any, **options: Any) -> None: def handle(self, *args: Any, **options: Any) -> None:
host = options["host"] host = options["host"]
if not HostConfig.objects.filter(host=host, enabled=True).exists(): try:
raise CommandError(f"Missing enabled HostConfig {host!r}")
config_source = DjangoConfigSource()
if options["apply"]: if options["apply"]:
if not options["yes"]: if not options["yes"]:
raise CommandError("--yes is required with --apply") raise CommandError("--yes is required with --apply")
result = run_retention_apply( result = run_sql_retention_apply(
prefix=Path(options["prefix"]), prefix=Path(options["prefix"]),
host=host, host=host,
kind=options["kind"], kind=options["kind"],
protect_bases=bool(options["protect_bases"]), protect_bases=bool(options["protect_bases"]),
yes=True, yes=True,
max_delete=int(options["max_delete"]), max_delete=int(options["max_delete"]),
config_source=config_source,
) )
else: else:
result = run_retention_plan( result = run_sql_retention_plan(
prefix=Path(options["prefix"]),
host=host, host=host,
kind=options["kind"], kind=options["kind"],
protect_bases=bool(options["protect_bases"]), protect_bases=bool(options["protect_bases"]),
config_source=config_source,
) )
except ConfigError as exc:
raise CommandError(str(exc)) from exc
self.stdout.write(json.dumps(result, indent=2, sort_keys=False)) self.stdout.write(json.dumps(result, indent=2, sort_keys=False))

View File

@@ -0,0 +1,182 @@
from __future__ import annotations
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from pobsync.errors import ConfigError
from pobsync.lock import acquire_host_lock
from pobsync.paths import PobsyncPaths
from pobsync.retention import Snapshot, apply_base_protection, build_retention_plan
from pobsync.util import sanitize_host
from .models import HostConfig, SnapshotRecord
def run_sql_retention_plan(*, host: str, kind: str, protect_bases: bool) -> dict[str, Any]:
host = sanitize_host(host)
if kind not in {"scheduled", "manual", "all"}:
raise ConfigError("kind must be scheduled, manual, or all")
host_config = _enabled_host_config(host)
retention = _retention_for_host(host_config)
snapshots = _snapshots_for_retention(host_config=host_config, kind=kind)
plan = build_retention_plan(
snapshots=snapshots,
retention=retention,
now=datetime.now(timezone.utc),
)
keep = set(plan.keep)
reasons = dict(plan.reasons)
if protect_bases:
keep, reasons = apply_base_protection(snapshots=snapshots, keep=keep, reasons=reasons)
delete = [snapshot for snapshot in snapshots if snapshot.dirname not in keep]
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"retention": retention,
"source": "sql",
"keep": sorted(keep),
"delete": [_snapshot_to_delete_item(snapshot) for snapshot in delete],
"reasons": reasons,
}
def run_sql_retention_apply(
*,
prefix: Path,
host: str,
kind: str,
protect_bases: bool,
yes: bool,
max_delete: int,
acquire_lock: bool = True,
) -> dict[str, Any]:
host = sanitize_host(host)
if not yes:
raise ConfigError("Refusing to delete snapshots without --yes")
if max_delete < 0:
raise ConfigError("--max-delete must be >= 0")
paths = PobsyncPaths(home=prefix)
def _do_apply() -> dict[str, Any]:
plan = run_sql_retention_plan(host=host, kind=kind, protect_bases=bool(protect_bases))
delete_list = plan.get("delete") or []
if not isinstance(delete_list, list):
raise ConfigError("Invalid retention plan output: delete is not a list")
if max_delete == 0 and len(delete_list) > 0:
raise ConfigError("Deletion blocked by --max-delete=0")
if len(delete_list) > max_delete:
raise ConfigError(f"Refusing to delete {len(delete_list)} snapshots (exceeds --max-delete={max_delete})")
actions: list[str] = []
deleted: list[dict[str, Any]] = []
for item in delete_list:
dirname = item.get("dirname") if isinstance(item, dict) else None
snap_kind = item.get("kind") if isinstance(item, dict) else None
snap_path = item.get("path") if isinstance(item, dict) else None
if not isinstance(dirname, str) or not isinstance(snap_kind, str) or not isinstance(snap_path, str):
continue
if snap_kind not in {"scheduled", "manual"}:
raise ConfigError(f"Refusing to delete unsupported snapshot kind: {snap_kind!r}")
path = Path(snap_path)
if not path.exists():
actions.append(f"skip missing {snap_kind}/{dirname}")
continue
if not path.is_dir():
raise ConfigError(f"Refusing to delete non-directory path: {snap_path}")
shutil.rmtree(path)
SnapshotRecord.objects.filter(host__host=host, kind=snap_kind, dirname=dirname).delete()
actions.append(f"deleted {snap_kind} {dirname}")
deleted.append({"dirname": dirname, "kind": snap_kind, "path": snap_path})
return {
"ok": True,
"host": host,
"kind": kind,
"protect_bases": bool(protect_bases),
"max_delete": max_delete,
"source": "sql",
"deleted": deleted,
"actions": actions,
}
if acquire_lock:
with acquire_host_lock(paths.locks_dir, host, command="retention-apply"):
return _do_apply()
return _do_apply()
def _enabled_host_config(host: str) -> HostConfig:
try:
return HostConfig.objects.get(host=host, enabled=True)
except HostConfig.DoesNotExist as exc:
raise ConfigError(f"Missing enabled HostConfig {host!r}") from exc
def _retention_for_host(host_config: HostConfig) -> dict[str, int]:
return {
"daily": host_config.retention_daily,
"weekly": host_config.retention_weekly,
"monthly": host_config.retention_monthly,
"yearly": host_config.retention_yearly,
}
def _snapshots_for_retention(*, host_config: HostConfig, kind: str) -> list[Snapshot]:
kinds = ["scheduled", "manual"] if kind == "all" else [kind]
records = (
SnapshotRecord.objects.filter(host=host_config, kind__in=kinds)
.exclude(kind=SnapshotRecord.Kind.INCOMPLETE)
.select_related("base")
.order_by("-started_at", "dirname")
)
return [_snapshot_from_record(record) for record in records]
def _snapshot_from_record(record: SnapshotRecord) -> Snapshot:
return Snapshot(
kind=record.kind,
dirname=record.dirname,
path=record.path,
dt=record.started_at or datetime.fromtimestamp(0, tz=timezone.utc),
status=record.status or None,
base=_base_meta_from_record(record),
)
def _base_meta_from_record(record: SnapshotRecord) -> dict[str, str] | None:
if record.base is not None:
return {
"kind": record.base.kind,
"dirname": record.base.dirname,
"path": record.base.path,
}
if record.base_kind and record.base_dirname:
return {
"kind": record.base_kind,
"dirname": record.base_dirname,
"path": record.base_path,
}
return None
def _snapshot_to_delete_item(snapshot: Snapshot) -> dict[str, Any]:
return {
"dirname": snapshot.dirname,
"kind": snapshot.kind,
"path": snapshot.path,
"dt": snapshot.dt.isoformat(),
"status": snapshot.status,
}

View File

@@ -9,6 +9,7 @@ from django.core.management import call_command
from django.core.management.base import CommandError from django.core.management.base import CommandError
from django.test import TestCase from django.test import TestCase
from pobsync.errors import ConfigError
from pobsync.util import write_yaml_atomic from pobsync.util import write_yaml_atomic
from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord
@@ -51,6 +52,99 @@ class RunBackupRecordsSnapshotTests(TestCase):
self.assertEqual(record.kind, "scheduled") self.assertEqual(record.kind, "scheduled")
self.assertEqual(record.status, "success") self.assertEqual(record.status, "success")
def test_prune_uses_sql_retention_after_snapshot_record_is_created(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
with (
patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled,
patch(
"pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply"
) as retention_apply,
):
run_scheduled.return_value = {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": str(snapshot_dir),
"base": None,
"rsync": {"exit_code": 0},
}
retention_apply.return_value = {"ok": True, "source": "sql", "deleted": []}
call_command(
"run_pobsync_backup",
host.host,
prefix=str(Path(tmp) / "home"),
prune=True,
prune_max_delete=3,
prune_protect_bases=True,
stdout=StringIO(),
)
run_scheduled.assert_called_once()
self.assertFalse(run_scheduled.call_args.kwargs["prune"])
retention_apply.assert_called_once_with(
prefix=Path(tmp) / "home",
host=host.host,
kind="scheduled",
protect_bases=True,
yes=True,
max_delete=3,
acquire_lock=False,
)
run = BackupRun.objects.get()
self.assertEqual(run.status, BackupRun.Status.SUCCESS)
self.assertEqual(run.result["prune"], {"ok": True, "source": "sql", "deleted": []})
def test_prune_failure_is_recorded_on_backup_run(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
with (
patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled,
patch(
"pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply"
) as retention_apply,
):
run_scheduled.return_value = {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": str(snapshot_dir),
"base": None,
"rsync": {"exit_code": 0},
}
retention_apply.side_effect = ConfigError("Deletion blocked by --max-delete=0")
with self.assertRaises(ConfigError):
call_command(
"run_pobsync_backup",
host.host,
prefix=str(Path(tmp) / "home"),
prune=True,
prune_max_delete=0,
stdout=StringIO(),
)
run = BackupRun.objects.get()
self.assertEqual(run.status, BackupRun.Status.FAILED)
self.assertIsNotNone(run.snapshot)
self.assertEqual(run.result["prune"]["ok"], False)
self.assertEqual(run.result["prune"]["type"], "ConfigError")
self.assertEqual(run.result["prune"]["error"], "Deletion blocked by --max-delete=0")
def test_failed_backup_upserts_incomplete_snapshot_record(self) -> None: def test_failed_backup_upserts_incomplete_snapshot_record(self) -> None:
with TemporaryDirectory() as tmp: with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups" backup_root = Path(tmp) / "backups"

View File

@@ -0,0 +1,149 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from io import StringIO
from pathlib import Path
from tempfile import TemporaryDirectory
from django.core.management import call_command
from django.test import TestCase
from pobsync.errors import ConfigError
from pobsync_backend.models import HostConfig, SnapshotRecord
from pobsync_backend.retention import run_sql_retention_apply, run_sql_retention_plan
class SqlRetentionTests(TestCase):
def test_plan_uses_snapshot_records(self) -> None:
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
old = self._snapshot(host, "20260518-021500Z__OLD")
new = self._snapshot(host, "20260519-021500Z__NEW")
plan = run_sql_retention_plan(host=host.host, kind="scheduled", protect_bases=False)
self.assertEqual(plan["source"], "sql")
self.assertEqual(plan["keep"], [new.dirname])
self.assertEqual([item["dirname"] for item in plan["delete"]], [old.dirname])
def test_plan_can_protect_base_snapshot_from_sql_relation(self) -> None:
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
base = self._snapshot(host, "20260518-021500Z__BASE")
child = self._snapshot(host, "20260519-021500Z__CHILD", base=base)
plan = run_sql_retention_plan(host=host.host, kind="scheduled", protect_bases=True)
self.assertEqual(plan["keep"], [base.dirname, child.dirname])
self.assertEqual(plan["delete"], [])
self.assertEqual(plan["reasons"][base.dirname], [f"base-of:{child.dirname}"])
def test_apply_deletes_snapshot_directory_and_record(self) -> None:
with TemporaryDirectory() as tmp:
prefix = Path(tmp) / "home"
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
old_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260518-021500Z__OLD"
new_dir = Path(tmp) / "backups" / host.host / "scheduled" / "20260519-021500Z__NEW"
old_dir.mkdir(parents=True)
new_dir.mkdir(parents=True)
old = self._snapshot(host, old_dir.name, path=str(old_dir))
new = self._snapshot(host, new_dir.name, path=str(new_dir))
result = run_sql_retention_apply(
prefix=prefix,
host=host.host,
kind="scheduled",
protect_bases=False,
yes=True,
max_delete=1,
acquire_lock=False,
)
self.assertFalse(old_dir.exists())
self.assertTrue(new_dir.exists())
self.assertTrue(SnapshotRecord.objects.filter(pk=new.pk).exists())
self.assertFalse(SnapshotRecord.objects.filter(pk=old.pk).exists())
self.assertEqual(result["deleted"], [{"dirname": old.dirname, "kind": "scheduled", "path": str(old_dir)}])
def test_apply_respects_max_delete(self) -> None:
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
self._snapshot(host, "20260517-021500Z__OLDER")
self._snapshot(host, "20260518-021500Z__OLD")
self._snapshot(host, "20260519-021500Z__NEW")
with self.assertRaisesRegex(ConfigError, "exceeds --max-delete=1"):
run_sql_retention_apply(
prefix=Path("/tmp/pobsync-test"),
host=host.host,
kind="scheduled",
protect_bases=False,
yes=True,
max_delete=1,
acquire_lock=False,
)
def test_management_command_plans_from_sql(self) -> None:
host = HostConfig.objects.create(
host="web-01",
address="web-01.example.test",
retention_daily=0,
retention_weekly=0,
retention_monthly=0,
retention_yearly=0,
)
old = self._snapshot(host, "20260518-021500Z__OLD")
new = self._snapshot(host, "20260519-021500Z__NEW")
stdout = StringIO()
call_command("run_pobsync_retention", host.host, stdout=stdout)
result = json.loads(stdout.getvalue())
self.assertEqual(result["source"], "sql")
self.assertEqual(result["keep"], [new.dirname])
self.assertEqual([item["dirname"] for item in result["delete"]], [old.dirname])
def _snapshot(
self,
host: HostConfig,
dirname: str,
*,
path: str | None = None,
base: SnapshotRecord | None = None,
) -> SnapshotRecord:
started_at = datetime.strptime(dirname.split("__", 1)[0], "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
return SnapshotRecord.objects.create(
host=host,
kind="scheduled",
dirname=dirname,
path=path or f"/backups/{host.host}/scheduled/{dirname}",
base=base,
status="success",
started_at=started_at,
)