from __future__ import annotations from datetime import timedelta from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch from django.test import TestCase from django.utils import timezone from pobsync.util import write_yaml_atomic from pobsync_backend.backup_runner import queue_backup_run, reconcile_running_runs from pobsync_backend.management.commands.run_pobsync_worker import Command from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord class BackupWorkerTests(TestCase): def test_queue_backup_run_records_requested_options(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run( host=host, dry_run=True, prune=True, prune_max_delete=3, prune_protect_bases=True, ) self.assertEqual(run.status, BackupRun.Status.QUEUED) self.assertEqual(run.run_type, BackupRun.RunType.MANUAL) self.assertEqual( run.result["requested"], { "dry_run": True, "verbose_output": True, "prune": True, "prune_max_delete": 3, "prune_protect_bases": True, }, ) def test_queue_backup_run_can_request_verbose_output(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, verbose_output=True) self.assertTrue(run.result["requested"]["verbose_output"]) def test_worker_executes_next_queued_run(self) -> None: with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" meta_dir = snapshot_dir / "meta" meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) run = queue_backup_run(host=host, verbose_output=True) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): run.refresh_from_db() self.assertIn("execution", run.result) return { "ok": True, "dry_run": False, "host": host.host, "snapshot": str(snapshot_dir), "base": None, "rsync": {"exit_code": 0}, } run_scheduled.side_effect = fake_run_scheduled count = Command()._run_once(prefix=Path(tmp) / "home") run_scheduled.assert_called_once() self.assertEqual(count, 1) self.assertEqual(run_scheduled.call_args.kwargs["run_id"], run.id) self.assertTrue(run_scheduled.call_args.kwargs["verbose_output"]) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.SUCCESS) self.assertEqual(SnapshotRecord.objects.count(), 1) self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) def test_worker_records_dry_run_log_path_while_running(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): run.refresh_from_db() self.assertEqual( run.result["execution"]["log"], f"/tmp/pobsync-dryrun/{host.host}/run-{run.id}/rsync.log", ) return { "ok": True, "dry_run": True, "host": host.host, "base": None, "log": run.result["execution"]["log"], "rsync": {"exit_code": 0}, } run_scheduled.side_effect = fake_run_scheduled count = Command()._run_once(prefix=Path(tmp) / "home") self.assertEqual(count, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.SUCCESS) self.assertEqual(run.result["log"], f"/tmp/pobsync-dryrun/{host.host}/run-{run.id}/rsync.log") def test_worker_preserves_cancelled_status_from_running_run(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): BackupRun.objects.filter(id=run.id).update(status=BackupRun.Status.CANCELLED) self.assertTrue(kwargs["cancel_check"]()) return { "ok": False, "dry_run": True, "cancelled": True, "host": host.host, "base": None, "log": f"/tmp/pobsync-dryrun/{host.host}/run-{run.id}/rsync.log", "rsync": {"exit_code": 130}, } run_scheduled.side_effect = fake_run_scheduled count = Command()._run_once(prefix=Path(tmp) / "home") self.assertEqual(count, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertEqual(run.rsync_exit_code, 130) def test_worker_returns_zero_without_queued_runs(self) -> None: count = Command()._run_once(prefix=Path("/opt/pobsync")) self.assertEqual(count, 0) def test_worker_reconciles_running_dry_run_with_terminal_broken_pipe_log(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) log_path = Path("/tmp/pobsync-dryrun/web-01/run-test-broken-pipe.log") log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text( "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" "rsync: [generator] write error: Broken pipe (32)\n", encoding="utf-8", ) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() run.result["execution"] = {"log": str(log_path)} run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs() self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.rsync_exit_code, 255) self.assertEqual(run.result["failure"]["category"], "transport") self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) def test_worker_reconciles_stale_running_dry_run_after_timeout(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() - timedelta(seconds=1300) run.result["timeout_seconds"] = 900 run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs(grace_seconds=300) self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.rsync_exit_code, 124) self.assertEqual(run.result["failure"]["category"], "timeout")