diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 1cd76c9..665f16c 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -23,6 +23,7 @@ from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_at DEFAULT_DRY_RUN_TIMEOUT_SECONDS = 900 +RSYNC_PARTIAL_VANISHED_EXIT_CODE = 24 def dry_run_log_path(host: str, run_id: int | None = None) -> Path: @@ -72,6 +73,24 @@ def classify_rsync_failure(exit_code: int | None, log_tail: list[str]) -> dict[s } +def classify_rsync_warning(exit_code: int | None, log_tail: list[str]) -> dict[str, str] | None: + joined_tail = "\n".join(log_tail).lower() + if exit_code == RSYNC_PARTIAL_VANISHED_EXIT_CODE: + return { + "category": "vanished", + "message": "Some source files vanished during rsync.", + "hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.", + } + if exit_code in (None, RSYNC_PARTIAL_VANISHED_EXIT_CODE) and ( + "file has vanished" in joined_tail or "vanished before it could be transferred" in joined_tail + ): + return { + "category": "vanished", + "message": "Some source files vanished during rsync.", + "hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.", + } + return None + def _collect_run_stats( *, log_path: Path, @@ -158,6 +177,7 @@ def run_scheduled( run_id: int | None = None, cancel_check: Callable[[], bool] | None = None, verbose_output: bool = False, + state_callback: Callable[[dict[str, Any]], None] | None = None, ) -> dict[str, Any]: host = sanitize_host(host) @@ -322,14 +342,28 @@ def run_scheduled( log_path.touch(exist_ok=True) write_yaml_atomic(meta_path, meta) + if state_callback is not None: + state_callback( + { + "status": "running", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": None}, + } + ) result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check) + log_tail = _read_log_tail(log_path) + warning = classify_rsync_warning(result.exit_code, log_tail) + successful_or_warning = result.exit_code == 0 or warning is not None end_ts = utc_now() meta["ended_at"] = format_iso_z(end_ts) meta["duration_seconds"] = int((end_ts - ts).total_seconds()) meta["rsync"]["exit_code"] = result.exit_code - meta["status"] = "cancelled" if result.cancelled else ("success" if result.exit_code == 0 else "failed") + meta["status"] = "cancelled" if result.cancelled else ("warning" if warning else ("success" if result.exit_code == 0 else "failed")) + if warning is not None: + meta["warning"] = warning meta["stats"] = _collect_run_stats( log_path=log_path, backup_root=Path(backup_root), @@ -349,8 +383,7 @@ def run_scheduled( "error": "rsync.log missing after execution", } - if result.exit_code != 0: - log_tail = _read_log_tail(log_path) + if not successful_or_warning: return { "ok": False, "dry_run": False, @@ -404,7 +437,9 @@ def run_scheduled( "snapshot": str(final_dir), "base": str(base_dir) if base_dir else None, "log": str(final_log_path), - "rsync": {"exit_code": result.exit_code}, + "status": meta["status"], + "warning": warning, + "rsync": {"exit_code": result.exit_code, "log_tail": log_tail}, "verbose_output": bool(verbose_output), "duration_seconds": meta["duration_seconds"], "stats": meta["stats"], diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index f9975fb..dece829 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -8,7 +8,13 @@ from pathlib import Path from django.db import transaction from django.utils import timezone -from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled +from pobsync.commands.run_scheduled import ( + DEFAULT_DRY_RUN_TIMEOUT_SECONDS, + classify_rsync_failure, + classify_rsync_warning, + dry_run_log_path, + run_scheduled, +) from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.retention import run_sql_retention_apply @@ -66,6 +72,7 @@ def execute_backup_run( run_id=run.id, cancel_check=lambda: _run_cancel_requested(run.id), verbose_output=bool(dry_run or verbose_output), + state_callback=lambda state: _record_running_state(run.id, state), ) except Exception as exc: run.refresh_from_db() @@ -83,6 +90,8 @@ def execute_backup_run( run.refresh_from_db() if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED: run.status = BackupRun.Status.CANCELLED + elif result.get("status") == BackupRun.Status.WARNING: + run.status = BackupRun.Status.WARNING else: run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED run.ended_at = timezone.now() @@ -201,11 +210,71 @@ def _run_cancel_requested(run_id: int) -> bool: return False +def _record_running_state(run_id: int, state: dict[str, object]) -> None: + try: + run = BackupRun.objects.only("id", "status", "result", "snapshot_path", "rsync_exit_code").get(id=run_id) + except BackupRun.DoesNotExist: + return + if run.status != BackupRun.Status.RUNNING: + return + + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} + incoming_rsync = state.get("rsync") if isinstance(state.get("rsync"), dict) else {} + + log_path = state.get("log") + snapshot_path = state.get("snapshot") + if isinstance(log_path, str) and log_path: + execution["log"] = log_path + if isinstance(snapshot_path, str) and snapshot_path: + execution["snapshot"] = snapshot_path + run.snapshot_path = snapshot_path + if incoming_rsync: + result["rsync"] = {**rsync, **incoming_rsync} + exit_code = incoming_rsync.get("exit_code") + if isinstance(exit_code, int): + run.rsync_exit_code = exit_code + result["execution"] = { + **execution, + "worker_pid": os.getpid(), + "worker_host": socket.gethostname(), + "heartbeat_at": timezone.now().isoformat(), + } + run.result = result + run.save(update_fields=["snapshot_path", "rsync_exit_code", "result"]) + + def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool: result = run.result if isinstance(run.result, dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} + log_path = _execution_log_path(result) + log_tail = _read_log_tail(log_path) if log_path is not None else [] + terminal_log = _terminal_rsync_log(log_tail) + exit_code = _exit_code_from_log(log_tail) stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds) if not requested.get("dry_run"): + if terminal_log: + failure = classify_rsync_failure(exit_code or 255, log_tail) + result.update( + { + "ok": False, + "host": run.host.host, + "log": str(log_path) if log_path else "", + "failure": failure, + "rsync": { + **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), + "exit_code": exit_code or 255, + "log_tail": log_tail, + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.rsync_exit_code = exit_code or 255 + run.result = result + run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) + return True if stale_worker: result.update( { @@ -225,14 +294,11 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s return True return False - log_path = _execution_log_path(result) - log_tail = _read_log_tail(log_path) if log_path is not None else [] - terminal_log = _terminal_rsync_log(log_tail) timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) if not terminal_log and not timed_out and not stale_worker: return False - exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255) + exit_code = exit_code or (124 if timed_out or stale_worker else 255) failure = classify_rsync_failure(exit_code, log_tail) if stale_worker and not terminal_log: failure = { @@ -305,6 +371,9 @@ def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]: def _terminal_rsync_log(log_tail: list[str]) -> bool: + warning = classify_rsync_warning(_exit_code_from_log(log_tail), log_tail) + if warning is not None: + return False return any(line.startswith("rsync error:") for line in log_tail) @@ -312,6 +381,8 @@ def _exit_code_from_log(log_tail: list[str]) -> int | None: for line in reversed(log_tail): if "code 255" in line: return 255 + if "code 24" in line: + return 24 if "code 124" in line: return 124 if "code 12" in line: diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 99db08d..71536bb 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -85,6 +85,37 @@ class BackupWorkerTests(TestCase): self.assertEqual(SnapshotRecord.objects.count(), 1) self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) + def test_worker_records_warning_status_from_completed_run(self) -> None: + with TemporaryDirectory() as tmp: + backup_root = Path(tmp) / "backups" + GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" + meta_dir = snapshot_dir / "meta" + meta_dir.mkdir(parents=True) + write_yaml_atomic(meta_dir / "meta.yaml", {"status": "warning", "started_at": "2026-05-19T02:15:00Z"}) + run = queue_backup_run(host=host) + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + run_scheduled.return_value = { + "ok": True, + "status": "warning", + "dry_run": False, + "host": host.host, + "snapshot": str(snapshot_dir), + "base": None, + "warning": {"category": "vanished"}, + "rsync": {"exit_code": 24}, + } + + count = Command()._run_once(prefix=Path(tmp) / "home") + + self.assertEqual(count, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.WARNING) + self.assertEqual(run.rsync_exit_code, 24) + self.assertEqual(run.result["warning"]["category"], "vanished") + def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) @@ -116,6 +147,41 @@ class BackupWorkerTests(TestCase): run_scheduled.side_effect = fake_run_scheduled Command()._run_once(prefix=Path(tmp) / "home") + def test_worker_records_real_run_log_path_while_running(self) -> None: + with TemporaryDirectory() as tmp: + GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + snapshot_dir = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" + log_path = snapshot_dir / "meta" / "rsync.log" + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + def fake_run_scheduled(**kwargs): + kwargs["state_callback"]( + { + "status": "running", + "snapshot": str(snapshot_dir), + "log": str(log_path), + "rsync": {"command": ["rsync"], "exit_code": None}, + } + ) + run.refresh_from_db() + self.assertEqual(run.snapshot_path, str(snapshot_dir)) + self.assertEqual(run.result["execution"]["log"], str(log_path)) + self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir)) + self.assertEqual(run.result["rsync"]["command"], ["rsync"]) + return { + "ok": True, + "dry_run": False, + "host": host.host, + "snapshot": "", + "base": None, + "rsync": {"exit_code": 0}, + } + + run_scheduled.side_effect = fake_run_scheduled + Command()._run_once(prefix=Path(tmp) / "home") + def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) @@ -136,6 +202,54 @@ class BackupWorkerTests(TestCase): self.assertEqual(run.result["failure"]["category"], "worker") self.assertIn("heartbeat stopped", run.result["failure"]["message"]) + def test_worker_reconciles_real_run_with_terminal_broken_pipe_log(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" + "rsync error: received SIGUSR1 (code 19) at main.c(1600) [receiver=3.4.1]\n" + "rsync: [generator] write error: Broken pipe (32)\n", + encoding="utf-8", + ) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() + run.result["execution"] = {"log": str(log_path)} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs() + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.rsync_exit_code, 255) + self.assertEqual(run.result["failure"]["category"], "transport") + self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) + + def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "file has vanished: \"/var/lib/app/session\"\n" + "rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n", + encoding="utf-8", + ) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() + run.result["execution"] = {"log": str(log_path)} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs() + + self.assertEqual(reconciled, 0) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.RUNNING) + def test_worker_records_dry_run_log_path_while_running(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) diff --git a/src/pobsync_backend/tests/test_run_scheduled_config_source.py b/src/pobsync_backend/tests/test_run_scheduled_config_source.py index fce2dc1..0285143 100644 --- a/src/pobsync_backend/tests/test_run_scheduled_config_source.py +++ b/src/pobsync_backend/tests/test_run_scheduled_config_source.py @@ -256,6 +256,62 @@ class RunScheduledConfigSourceTests(SimpleTestCase): self.assertIn("stats:", meta_text) self.assertIn("files_total: 10", meta_text) + def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None: + states = [] + + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + self.assertEqual(len(states), 1) + self.assertEqual(states[0]["status"], "running") + self.assertEqual(states[0]["log"], str(log_path)) + self.assertEqual(states[0]["rsync"]["command"], command) + log_path.write_text("Number of files: 1\n", encoding="utf-8") + return RsyncResult(exit_code=0, command=command) + + with TemporaryDirectory() as tmp: + with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync): + run_scheduled( + prefix=Path(tmp) / "home", + host="web-01", + dry_run=False, + config_source=FakeConfigSource(backup_root=str(Path(tmp) / "backups")), + state_callback=states.append, + ) + + self.assertEqual(len(states), 1) + self.assertIn("/.incomplete/", states[0]["snapshot"]) + + def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None: + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + log_path.write_text( + "file has vanished: \"/var/lib/app/session\"\n" + "rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n", + encoding="utf-8", + ) + data_dir = log_path.parent.parent / "data" + data_dir.mkdir(parents=True, exist_ok=True) + (data_dir / "payload.txt").write_text("payload", encoding="utf-8") + return RsyncResult(exit_code=24, command=command) + + with TemporaryDirectory() as tmp: + backup_root = Path(tmp) / "backups" + with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync): + result = run_scheduled( + prefix=Path(tmp) / "home", + host="web-01", + dry_run=False, + config_source=FakeConfigSource(backup_root=str(backup_root)), + ) + + snapshot = Path(result["snapshot"]) + self.assertTrue((snapshot / "data" / "payload.txt").exists()) + + self.assertTrue(result["ok"]) + self.assertEqual(result["status"], "warning") + self.assertEqual(result["rsync"]["exit_code"], 24) + self.assertEqual(result["warning"]["category"], "vanished") + self.assertEqual(snapshot.parent.name, "scheduled") + self.assertIn("file has vanished", "\n".join(result["rsync"]["log_tail"])) + def test_dry_run_reports_cancelled_rsync(self) -> None: def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): self.assertTrue(cancel_check())