From 6eb1b4add38e3ceb491cd3ab73ded2458127ce8c Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Sat, 23 May 2026 00:23:14 +0200 Subject: [PATCH 1/3] (bugfix) Reconcile real rsync failures from worker logs Record live rsync log paths for normal backup runs so the worker can recover stale running state after terminal rsync errors. Treat rsync vanished-file exit code 24 as a warning and keep the completed snapshot instead of failing the run into incomplete state. Closes #54 --- src/pobsync/commands/run_scheduled.py | 43 ++++++- src/pobsync_backend/backup_runner.py | 81 ++++++++++++- .../tests/test_backup_worker.py | 114 ++++++++++++++++++ .../tests/test_run_scheduled_config_source.py | 56 +++++++++ 4 files changed, 285 insertions(+), 9 deletions(-) diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 1cd76c9..665f16c 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -23,6 +23,7 @@ from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_at DEFAULT_DRY_RUN_TIMEOUT_SECONDS = 900 +RSYNC_PARTIAL_VANISHED_EXIT_CODE = 24 def dry_run_log_path(host: str, run_id: int | None = None) -> Path: @@ -72,6 +73,24 @@ def classify_rsync_failure(exit_code: int | None, log_tail: list[str]) -> dict[s } +def classify_rsync_warning(exit_code: int | None, log_tail: list[str]) -> dict[str, str] | None: + joined_tail = "\n".join(log_tail).lower() + if exit_code == RSYNC_PARTIAL_VANISHED_EXIT_CODE: + return { + "category": "vanished", + "message": "Some source files vanished during rsync.", + "hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.", + } + if exit_code in (None, RSYNC_PARTIAL_VANISHED_EXIT_CODE) and ( + "file has vanished" in joined_tail or "vanished before it could be transferred" in joined_tail + ): + return { + "category": "vanished", + "message": "Some source files vanished during rsync.", + "hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.", + } + return None + def _collect_run_stats( *, log_path: Path, @@ -158,6 +177,7 @@ def run_scheduled( run_id: int | None = None, cancel_check: Callable[[], bool] | None = None, verbose_output: bool = False, + state_callback: Callable[[dict[str, Any]], None] | None = None, ) -> dict[str, Any]: host = sanitize_host(host) @@ -322,14 +342,28 @@ def run_scheduled( log_path.touch(exist_ok=True) write_yaml_atomic(meta_path, meta) + if state_callback is not None: + state_callback( + { + "status": "running", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": None}, + } + ) result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check) + log_tail = _read_log_tail(log_path) + warning = classify_rsync_warning(result.exit_code, log_tail) + successful_or_warning = result.exit_code == 0 or warning is not None end_ts = utc_now() meta["ended_at"] = format_iso_z(end_ts) meta["duration_seconds"] = int((end_ts - ts).total_seconds()) meta["rsync"]["exit_code"] = result.exit_code - meta["status"] = "cancelled" if result.cancelled else ("success" if result.exit_code == 0 else "failed") + meta["status"] = "cancelled" if result.cancelled else ("warning" if warning else ("success" if result.exit_code == 0 else "failed")) + if warning is not None: + meta["warning"] = warning meta["stats"] = _collect_run_stats( log_path=log_path, backup_root=Path(backup_root), @@ -349,8 +383,7 @@ def run_scheduled( "error": "rsync.log missing after execution", } - if result.exit_code != 0: - log_tail = _read_log_tail(log_path) + if not successful_or_warning: return { "ok": False, "dry_run": False, @@ -404,7 +437,9 @@ def run_scheduled( "snapshot": str(final_dir), "base": str(base_dir) if base_dir else None, "log": str(final_log_path), - "rsync": {"exit_code": result.exit_code}, + "status": meta["status"], + "warning": warning, + "rsync": {"exit_code": result.exit_code, "log_tail": log_tail}, "verbose_output": bool(verbose_output), "duration_seconds": meta["duration_seconds"], "stats": meta["stats"], diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index f9975fb..dece829 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -8,7 +8,13 @@ from pathlib import Path from django.db import transaction from django.utils import timezone -from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled +from pobsync.commands.run_scheduled import ( + DEFAULT_DRY_RUN_TIMEOUT_SECONDS, + classify_rsync_failure, + classify_rsync_warning, + dry_run_log_path, + run_scheduled, +) from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.retention import run_sql_retention_apply @@ -66,6 +72,7 @@ def execute_backup_run( run_id=run.id, cancel_check=lambda: _run_cancel_requested(run.id), verbose_output=bool(dry_run or verbose_output), + state_callback=lambda state: _record_running_state(run.id, state), ) except Exception as exc: run.refresh_from_db() @@ -83,6 +90,8 @@ def execute_backup_run( run.refresh_from_db() if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED: run.status = BackupRun.Status.CANCELLED + elif result.get("status") == BackupRun.Status.WARNING: + run.status = BackupRun.Status.WARNING else: run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED run.ended_at = timezone.now() @@ -201,11 +210,71 @@ def _run_cancel_requested(run_id: int) -> bool: return False +def _record_running_state(run_id: int, state: dict[str, object]) -> None: + try: + run = BackupRun.objects.only("id", "status", "result", "snapshot_path", "rsync_exit_code").get(id=run_id) + except BackupRun.DoesNotExist: + return + if run.status != BackupRun.Status.RUNNING: + return + + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} + incoming_rsync = state.get("rsync") if isinstance(state.get("rsync"), dict) else {} + + log_path = state.get("log") + snapshot_path = state.get("snapshot") + if isinstance(log_path, str) and log_path: + execution["log"] = log_path + if isinstance(snapshot_path, str) and snapshot_path: + execution["snapshot"] = snapshot_path + run.snapshot_path = snapshot_path + if incoming_rsync: + result["rsync"] = {**rsync, **incoming_rsync} + exit_code = incoming_rsync.get("exit_code") + if isinstance(exit_code, int): + run.rsync_exit_code = exit_code + result["execution"] = { + **execution, + "worker_pid": os.getpid(), + "worker_host": socket.gethostname(), + "heartbeat_at": timezone.now().isoformat(), + } + run.result = result + run.save(update_fields=["snapshot_path", "rsync_exit_code", "result"]) + + def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool: result = run.result if isinstance(run.result, dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} + log_path = _execution_log_path(result) + log_tail = _read_log_tail(log_path) if log_path is not None else [] + terminal_log = _terminal_rsync_log(log_tail) + exit_code = _exit_code_from_log(log_tail) stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds) if not requested.get("dry_run"): + if terminal_log: + failure = classify_rsync_failure(exit_code or 255, log_tail) + result.update( + { + "ok": False, + "host": run.host.host, + "log": str(log_path) if log_path else "", + "failure": failure, + "rsync": { + **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), + "exit_code": exit_code or 255, + "log_tail": log_tail, + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.rsync_exit_code = exit_code or 255 + run.result = result + run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) + return True if stale_worker: result.update( { @@ -225,14 +294,11 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s return True return False - log_path = _execution_log_path(result) - log_tail = _read_log_tail(log_path) if log_path is not None else [] - terminal_log = _terminal_rsync_log(log_tail) timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) if not terminal_log and not timed_out and not stale_worker: return False - exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255) + exit_code = exit_code or (124 if timed_out or stale_worker else 255) failure = classify_rsync_failure(exit_code, log_tail) if stale_worker and not terminal_log: failure = { @@ -305,6 +371,9 @@ def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]: def _terminal_rsync_log(log_tail: list[str]) -> bool: + warning = classify_rsync_warning(_exit_code_from_log(log_tail), log_tail) + if warning is not None: + return False return any(line.startswith("rsync error:") for line in log_tail) @@ -312,6 +381,8 @@ def _exit_code_from_log(log_tail: list[str]) -> int | None: for line in reversed(log_tail): if "code 255" in line: return 255 + if "code 24" in line: + return 24 if "code 124" in line: return 124 if "code 12" in line: diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 99db08d..71536bb 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -85,6 +85,37 @@ class BackupWorkerTests(TestCase): self.assertEqual(SnapshotRecord.objects.count(), 1) self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) + def test_worker_records_warning_status_from_completed_run(self) -> None: + with TemporaryDirectory() as tmp: + backup_root = Path(tmp) / "backups" + GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" + meta_dir = snapshot_dir / "meta" + meta_dir.mkdir(parents=True) + write_yaml_atomic(meta_dir / "meta.yaml", {"status": "warning", "started_at": "2026-05-19T02:15:00Z"}) + run = queue_backup_run(host=host) + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + run_scheduled.return_value = { + "ok": True, + "status": "warning", + "dry_run": False, + "host": host.host, + "snapshot": str(snapshot_dir), + "base": None, + "warning": {"category": "vanished"}, + "rsync": {"exit_code": 24}, + } + + count = Command()._run_once(prefix=Path(tmp) / "home") + + self.assertEqual(count, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.WARNING) + self.assertEqual(run.rsync_exit_code, 24) + self.assertEqual(run.result["warning"]["category"], "vanished") + def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) @@ -116,6 +147,41 @@ class BackupWorkerTests(TestCase): run_scheduled.side_effect = fake_run_scheduled Command()._run_once(prefix=Path(tmp) / "home") + def test_worker_records_real_run_log_path_while_running(self) -> None: + with TemporaryDirectory() as tmp: + GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + snapshot_dir = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" + log_path = snapshot_dir / "meta" / "rsync.log" + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + def fake_run_scheduled(**kwargs): + kwargs["state_callback"]( + { + "status": "running", + "snapshot": str(snapshot_dir), + "log": str(log_path), + "rsync": {"command": ["rsync"], "exit_code": None}, + } + ) + run.refresh_from_db() + self.assertEqual(run.snapshot_path, str(snapshot_dir)) + self.assertEqual(run.result["execution"]["log"], str(log_path)) + self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir)) + self.assertEqual(run.result["rsync"]["command"], ["rsync"]) + return { + "ok": True, + "dry_run": False, + "host": host.host, + "snapshot": "", + "base": None, + "rsync": {"exit_code": 0}, + } + + run_scheduled.side_effect = fake_run_scheduled + Command()._run_once(prefix=Path(tmp) / "home") + def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") run = queue_backup_run(host=host) @@ -136,6 +202,54 @@ class BackupWorkerTests(TestCase): self.assertEqual(run.result["failure"]["category"], "worker") self.assertIn("heartbeat stopped", run.result["failure"]["message"]) + def test_worker_reconciles_real_run_with_terminal_broken_pipe_log(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" + "rsync error: received SIGUSR1 (code 19) at main.c(1600) [receiver=3.4.1]\n" + "rsync: [generator] write error: Broken pipe (32)\n", + encoding="utf-8", + ) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() + run.result["execution"] = {"log": str(log_path)} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs() + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.rsync_exit_code, 255) + self.assertEqual(run.result["failure"]["category"], "transport") + self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) + + def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "file has vanished: \"/var/lib/app/session\"\n" + "rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n", + encoding="utf-8", + ) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() + run.result["execution"] = {"log": str(log_path)} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs() + + self.assertEqual(reconciled, 0) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.RUNNING) + def test_worker_records_dry_run_log_path_while_running(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) diff --git a/src/pobsync_backend/tests/test_run_scheduled_config_source.py b/src/pobsync_backend/tests/test_run_scheduled_config_source.py index fce2dc1..0285143 100644 --- a/src/pobsync_backend/tests/test_run_scheduled_config_source.py +++ b/src/pobsync_backend/tests/test_run_scheduled_config_source.py @@ -256,6 +256,62 @@ class RunScheduledConfigSourceTests(SimpleTestCase): self.assertIn("stats:", meta_text) self.assertIn("files_total: 10", meta_text) + def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None: + states = [] + + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + self.assertEqual(len(states), 1) + self.assertEqual(states[0]["status"], "running") + self.assertEqual(states[0]["log"], str(log_path)) + self.assertEqual(states[0]["rsync"]["command"], command) + log_path.write_text("Number of files: 1\n", encoding="utf-8") + return RsyncResult(exit_code=0, command=command) + + with TemporaryDirectory() as tmp: + with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync): + run_scheduled( + prefix=Path(tmp) / "home", + host="web-01", + dry_run=False, + config_source=FakeConfigSource(backup_root=str(Path(tmp) / "backups")), + state_callback=states.append, + ) + + self.assertEqual(len(states), 1) + self.assertIn("/.incomplete/", states[0]["snapshot"]) + + def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None: + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + log_path.write_text( + "file has vanished: \"/var/lib/app/session\"\n" + "rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n", + encoding="utf-8", + ) + data_dir = log_path.parent.parent / "data" + data_dir.mkdir(parents=True, exist_ok=True) + (data_dir / "payload.txt").write_text("payload", encoding="utf-8") + return RsyncResult(exit_code=24, command=command) + + with TemporaryDirectory() as tmp: + backup_root = Path(tmp) / "backups" + with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync): + result = run_scheduled( + prefix=Path(tmp) / "home", + host="web-01", + dry_run=False, + config_source=FakeConfigSource(backup_root=str(backup_root)), + ) + + snapshot = Path(result["snapshot"]) + self.assertTrue((snapshot / "data" / "payload.txt").exists()) + + self.assertTrue(result["ok"]) + self.assertEqual(result["status"], "warning") + self.assertEqual(result["rsync"]["exit_code"], 24) + self.assertEqual(result["warning"]["category"], "vanished") + self.assertEqual(snapshot.parent.name, "scheduled") + self.assertIn("file has vanished", "\n".join(result["rsync"]["log_tail"])) + def test_dry_run_reports_cancelled_rsync(self) -> None: def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): self.assertTrue(cancel_check()) From 28da9c4096c39ae17cd4de16a2c4157d67b27a4f Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Sat, 23 May 2026 00:26:22 +0200 Subject: [PATCH 2/3] (bugfix) Track rsync process state for running backups Record rsync process pid and execution phase while normal backup runs are active so the worker can reconcile stale running rows when rsync has already disappeared. Keep finalizing runs out of the missing-process path to avoid marking slow post-rsync stats collection as a failed transfer. Closes #54 --- src/pobsync/commands/run_scheduled.py | 32 ++++++++++- src/pobsync/rsync.py | 3 ++ src/pobsync_backend/backup_runner.py | 54 +++++++++++++++++++ .../tests/test_backup_worker.py | 48 ++++++++++++++++- .../tests/test_run_scheduled_config_source.py | 13 ++++- 5 files changed, 146 insertions(+), 4 deletions(-) diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 665f16c..2268ed5 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -346,16 +346,46 @@ def run_scheduled( state_callback( { "status": "running", + "phase": "preparing", "snapshot": str(incomplete_dir), "log": str(log_path), "rsync": {"command": cmd, "exit_code": None}, } ) - result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check) + def process_started(pid: int, pgid: int) -> None: + if state_callback is None: + return + state_callback( + { + "status": "running", + "phase": "rsync", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": None, "pid": pid, "pgid": pgid}, + } + ) + + result = run_rsync( + cmd, + log_path=log_path, + timeout_seconds=timeout_seconds, + cancel_check=cancel_check, + process_started=process_started if state_callback is not None else None, + ) log_tail = _read_log_tail(log_path) warning = classify_rsync_warning(result.exit_code, log_tail) successful_or_warning = result.exit_code == 0 or warning is not None + if state_callback is not None: + state_callback( + { + "status": "running", + "phase": "finalizing", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": result.exit_code, "log_tail": log_tail}, + } + ) end_ts = utc_now() meta["ended_at"] = format_iso_z(end_ts) diff --git a/src/pobsync/rsync.py b/src/pobsync/rsync.py index 2f0acb0..32fa2f6 100644 --- a/src/pobsync/rsync.py +++ b/src/pobsync/rsync.py @@ -82,6 +82,7 @@ def run_rsync( log_path: Path, timeout_seconds: int, cancel_check: Callable[[], bool] | None = None, + process_started: Callable[[int, int], None] | None = None, ) -> RsyncResult: """ Run rsync and always write stdout/stderr to log_path. @@ -95,6 +96,8 @@ def run_rsync( with log_path.open("ab") as f: process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, start_new_session=True) + if process_started is not None: + process_started(process.pid, os.getpgid(process.pid)) started = time.monotonic() while True: exit_code = process.poll() diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index dece829..8b7f9ec 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -225,6 +225,9 @@ def _record_running_state(run_id: int, state: dict[str, object]) -> None: log_path = state.get("log") snapshot_path = state.get("snapshot") + phase = state.get("phase") + if isinstance(phase, str) and phase: + execution["phase"] = phase if isinstance(log_path, str) and log_path: execution["log"] = log_path if isinstance(snapshot_path, str) and snapshot_path: @@ -275,6 +278,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s run.result = result run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) return True + if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds): + result.update( + { + "ok": False, + "host": run.host.host, + "log": str(log_path) if log_path else "", + "failure": { + "category": "rsync_process", + "message": "The rsync process is no longer running while the backup is still marked running.", + "hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.", + }, + "rsync": { + **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), + "exit_code": exit_code or 255, + "log_tail": log_tail, + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.rsync_exit_code = exit_code or 255 + run.result = result + run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) + return True if stale_worker: result.update( { @@ -413,6 +440,33 @@ def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> b return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds) +def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool: + if grace_seconds <= 0: + return False + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + if execution.get("phase") != "rsync": + return False + rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} + pid = rsync.get("pid") + if not isinstance(pid, int) or pid <= 0: + return False + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at + if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds): + return False + return not _process_exists(pid) + + +def _process_exists(pid: int) -> bool: + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + def _parse_iso_datetime(value: object): if not isinstance(value, str) or not value: return None diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 71536bb..084fc53 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -160,16 +160,19 @@ class BackupWorkerTests(TestCase): kwargs["state_callback"]( { "status": "running", + "phase": "rsync", "snapshot": str(snapshot_dir), "log": str(log_path), - "rsync": {"command": ["rsync"], "exit_code": None}, + "rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234}, } ) run.refresh_from_db() self.assertEqual(run.snapshot_path, str(snapshot_dir)) + self.assertEqual(run.result["execution"]["phase"], "rsync") self.assertEqual(run.result["execution"]["log"], str(log_path)) self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir)) self.assertEqual(run.result["rsync"]["command"], ["rsync"]) + self.assertEqual(run.result["rsync"]["pid"], 1234) return { "ok": True, "dry_run": False, @@ -228,6 +231,49 @@ class BackupWorkerTests(TestCase): self.assertEqual(run.result["failure"]["category"], "transport") self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) + def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text("sending incremental file list\n", encoding="utf-8") + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(minutes=10) + run.result["execution"] = { + "phase": "rsync", + "log": str(log_path), + "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), + } + run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.result["failure"]["category"], "rsync_process") + self.assertEqual(run.rsync_exit_code, 255) + + def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(minutes=10) + run.result["execution"] = { + "phase": "finalizing", + "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), + } + run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) + + self.assertEqual(reconciled, 0) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.RUNNING) + def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None: with TemporaryDirectory() as tmp: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") diff --git a/src/pobsync_backend/tests/test_run_scheduled_config_source.py b/src/pobsync_backend/tests/test_run_scheduled_config_source.py index 0285143..117c91d 100644 --- a/src/pobsync_backend/tests/test_run_scheduled_config_source.py +++ b/src/pobsync_backend/tests/test_run_scheduled_config_source.py @@ -259,11 +259,18 @@ class RunScheduledConfigSourceTests(SimpleTestCase): def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None: states = [] - def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None, process_started=None): self.assertEqual(len(states), 1) self.assertEqual(states[0]["status"], "running") + self.assertEqual(states[0]["phase"], "preparing") self.assertEqual(states[0]["log"], str(log_path)) self.assertEqual(states[0]["rsync"]["command"], command) + self.assertIsNotNone(process_started) + process_started(1234, 1234) + self.assertEqual(len(states), 2) + self.assertEqual(states[1]["phase"], "rsync") + self.assertEqual(states[1]["rsync"]["pid"], 1234) + self.assertEqual(states[1]["rsync"]["pgid"], 1234) log_path.write_text("Number of files: 1\n", encoding="utf-8") return RsyncResult(exit_code=0, command=command) @@ -277,8 +284,10 @@ class RunScheduledConfigSourceTests(SimpleTestCase): state_callback=states.append, ) - self.assertEqual(len(states), 1) + self.assertEqual(len(states), 3) self.assertIn("/.incomplete/", states[0]["snapshot"]) + self.assertEqual(states[2]["phase"], "finalizing") + self.assertEqual(states[2]["rsync"]["exit_code"], 0) def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None: def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): From 5788f5385417960e03411378eef5df452d768600 Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Sat, 23 May 2026 00:31:24 +0200 Subject: [PATCH 3/3] (bugfix) Keep rsync runner callback optional Only pass the process_started hook when live run state tracking is active, so existing rsync call sites and tests without that hook remain compatible. Refs #54 --- src/pobsync/commands/run_scheduled.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 2268ed5..b8acf10 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -366,13 +366,14 @@ def run_scheduled( } ) - result = run_rsync( - cmd, - log_path=log_path, - timeout_seconds=timeout_seconds, - cancel_check=cancel_check, - process_started=process_started if state_callback is not None else None, - ) + run_rsync_kwargs: dict[str, Any] = { + "log_path": log_path, + "timeout_seconds": timeout_seconds, + "cancel_check": cancel_check, + } + if state_callback is not None: + run_rsync_kwargs["process_started"] = process_started + result = run_rsync(cmd, **run_rsync_kwargs) log_tail = _read_log_tail(log_path) warning = classify_rsync_warning(result.exit_code, log_tail) successful_or_warning = result.exit_code == 0 or warning is not None