from __future__ import annotations from datetime import timedelta from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch from django.test import TestCase from django.utils import timezone from pobsync.util import write_yaml_atomic from pobsync_backend.backup_runner import queue_backup_run, reconcile_running_runs from pobsync_backend.management.commands.run_pobsync_worker import Command from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord class BackupWorkerTests(TestCase): def test_queue_backup_run_records_requested_options(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run( host=host, dry_run=True, prune=True, prune_max_delete=3, prune_protect_bases=True, ) self.assertEqual(run.status, BackupRun.Status.QUEUED) self.assertEqual(run.run_type, BackupRun.RunType.MANUAL) self.assertEqual( run.result["requested"], { "dry_run": True, "verbose_output": True, "prune": True, "prune_max_delete": 3, "prune_protect_bases": True, }, ) def test_queue_backup_run_can_request_verbose_output(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, verbose_output=True) self.assertTrue(run.result["requested"]["verbose_output"]) def test_worker_executes_next_queued_run(self) -> None: with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" meta_dir = snapshot_dir / "meta" meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) run = queue_backup_run(host=host, verbose_output=True) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): run.refresh_from_db() self.assertIn("execution", run.result) self.assertIn("worker_pid", run.result["execution"]) self.assertIn("worker_host", run.result["execution"]) self.assertIn("heartbeat_at", run.result["execution"]) return { "ok": True, "dry_run": False, "host": host.host, "snapshot": str(snapshot_dir), "base": None, "rsync": {"exit_code": 0}, } run_scheduled.side_effect = fake_run_scheduled count = Command()._run_once(prefix=Path(tmp) / "home") run_scheduled.assert_called_once() self.assertEqual(count, 1) self.assertEqual(run_scheduled.call_args.kwargs["run_id"], run.id) self.assertTrue(run_scheduled.call_args.kwargs["verbose_output"]) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.SUCCESS) self.assertEqual(SnapshotRecord.objects.count(), 1) self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) def test_worker_records_warning_status_from_completed_run(self) -> None: with TemporaryDirectory() as tmp: backup_root = Path(tmp) / "backups" GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" meta_dir = snapshot_dir / "meta" meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "warning", "started_at": "2026-05-19T02:15:00Z"}) run = queue_backup_run(host=host) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: run_scheduled.return_value = { "ok": True, "status": "warning", "dry_run": False, "host": host.host, "snapshot": str(snapshot_dir), "base": None, "warning": {"category": "vanished"}, "rsync": {"exit_code": 24}, } count = Command()._run_once(prefix=Path(tmp) / "home") self.assertEqual(count, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.WARNING) self.assertEqual(run.rsync_exit_code, 24) self.assertEqual(run.result["warning"]["category"], "vanished") def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): run.refresh_from_db() old_heartbeat = timezone.now() - timedelta(seconds=120) run.result["execution"]["heartbeat_at"] = old_heartbeat.isoformat() run.save(update_fields=["result"]) self.assertFalse(kwargs["cancel_check"]()) run.refresh_from_db() self.assertGreater( timezone.datetime.fromisoformat(run.result["execution"]["heartbeat_at"]), old_heartbeat, ) return { "ok": True, "dry_run": False, "host": host.host, "snapshot": "", "base": None, "rsync": {"exit_code": 0}, } run_scheduled.side_effect = fake_run_scheduled Command()._run_once(prefix=Path(tmp) / "home") def test_worker_records_real_run_log_path_while_running(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) snapshot_dir = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" log_path = snapshot_dir / "meta" / "rsync.log" with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): kwargs["state_callback"]( { "status": "running", "phase": "rsync", "snapshot": str(snapshot_dir), "log": str(log_path), "rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234}, } ) run.refresh_from_db() self.assertEqual(run.snapshot_path, str(snapshot_dir)) self.assertEqual(run.result["execution"]["phase"], "rsync") self.assertEqual(run.result["execution"]["log"], str(log_path)) self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir)) self.assertEqual(run.result["rsync"]["command"], ["rsync"]) self.assertEqual(run.result["rsync"]["pid"], 1234) return { "ok": True, "dry_run": False, "host": host.host, "snapshot": "", "base": None, "rsync": {"exit_code": 0}, } run_scheduled.side_effect = fake_run_scheduled Command()._run_once(prefix=Path(tmp) / "home") def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() - timedelta(seconds=120) run.result["execution"] = { "worker_pid": 123, "worker_host": "backup", "heartbeat_at": (timezone.now() - timedelta(seconds=90)).isoformat(), } run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs(stale_worker_seconds=30) self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.result["failure"]["category"], "worker") self.assertIn("heartbeat stopped", run.result["failure"]["message"]) def test_worker_reconciles_real_run_with_terminal_broken_pipe_log(self) -> None: with TemporaryDirectory() as tmp: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text( "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" "rsync error: received SIGUSR1 (code 19) at main.c(1600) [receiver=3.4.1]\n" "rsync: [generator] write error: Broken pipe (32)\n", encoding="utf-8", ) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() run.result["execution"] = {"log": str(log_path)} run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs() self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.rsync_exit_code, 255) self.assertEqual(run.result["failure"]["category"], "transport") self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None: with TemporaryDirectory() as tmp: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text("sending incremental file list\n", encoding="utf-8") run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() - timedelta(minutes=10) run.result["execution"] = { "phase": "rsync", "log": str(log_path), "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), } run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]} run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.result["failure"]["category"], "rsync_process") self.assertEqual(run.rsync_exit_code, 255) def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() - timedelta(minutes=10) run.result["execution"] = { "phase": "finalizing", "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), } run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0} run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) self.assertEqual(reconciled, 0) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.RUNNING) def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None: with TemporaryDirectory() as tmp: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text( "file has vanished: \"/var/lib/app/session\"\n" "rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n", encoding="utf-8", ) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() run.result["execution"] = {"log": str(log_path)} run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs() self.assertEqual(reconciled, 0) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.RUNNING) def test_worker_records_dry_run_log_path_while_running(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): run.refresh_from_db() self.assertEqual( run.result["execution"]["log"], f"/tmp/pobsync-dryrun/{host.host}/run-{run.id}/rsync.log", ) return { "ok": True, "dry_run": True, "host": host.host, "base": None, "log": run.result["execution"]["log"], "rsync": {"exit_code": 0}, } run_scheduled.side_effect = fake_run_scheduled count = Command()._run_once(prefix=Path(tmp) / "home") self.assertEqual(count, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.SUCCESS) self.assertEqual(run.result["log"], f"/tmp/pobsync-dryrun/{host.host}/run-{run.id}/rsync.log") def test_worker_preserves_cancelled_status_from_running_run(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: def fake_run_scheduled(**kwargs): BackupRun.objects.filter(id=run.id).update(status=BackupRun.Status.CANCELLED) self.assertTrue(kwargs["cancel_check"]()) return { "ok": False, "dry_run": True, "cancelled": True, "host": host.host, "base": None, "log": f"/tmp/pobsync-dryrun/{host.host}/run-{run.id}/rsync.log", "rsync": {"exit_code": 130}, } run_scheduled.side_effect = fake_run_scheduled count = Command()._run_once(prefix=Path(tmp) / "home") self.assertEqual(count, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.CANCELLED) self.assertEqual(run.rsync_exit_code, 130) def test_worker_returns_zero_without_queued_runs(self) -> None: count = Command()._run_once(prefix=Path("/opt/pobsync")) self.assertEqual(count, 0) def test_worker_reconciles_running_dry_run_with_terminal_broken_pipe_log(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) log_path = Path("/tmp/pobsync-dryrun/web-01/run-test-broken-pipe.log") log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text( "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" "rsync: [generator] write error: Broken pipe (32)\n", encoding="utf-8", ) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() run.result["execution"] = {"log": str(log_path)} run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs() self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.rsync_exit_code, 255) self.assertEqual(run.result["failure"]["category"], "transport") self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) def test_worker_reconciles_stale_running_dry_run_after_timeout(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host, dry_run=True) run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() - timedelta(seconds=1300) run.result["timeout_seconds"] = 900 run.save(update_fields=["status", "started_at", "result"]) reconciled = reconcile_running_runs(grace_seconds=300) self.assertEqual(reconciled, 1) run.refresh_from_db() self.assertEqual(run.status, BackupRun.Status.FAILED) self.assertEqual(run.rsync_exit_code, 124) self.assertEqual(run.result["failure"]["category"], "timeout")