From 28da9c4096c39ae17cd4de16a2c4157d67b27a4f Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Sat, 23 May 2026 00:26:22 +0200 Subject: [PATCH] (bugfix) Track rsync process state for running backups Record rsync process pid and execution phase while normal backup runs are active so the worker can reconcile stale running rows when rsync has already disappeared. Keep finalizing runs out of the missing-process path to avoid marking slow post-rsync stats collection as a failed transfer. Closes #54 --- src/pobsync/commands/run_scheduled.py | 32 ++++++++++- src/pobsync/rsync.py | 3 ++ src/pobsync_backend/backup_runner.py | 54 +++++++++++++++++++ .../tests/test_backup_worker.py | 48 ++++++++++++++++- .../tests/test_run_scheduled_config_source.py | 13 ++++- 5 files changed, 146 insertions(+), 4 deletions(-) diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 665f16c..2268ed5 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -346,16 +346,46 @@ def run_scheduled( state_callback( { "status": "running", + "phase": "preparing", "snapshot": str(incomplete_dir), "log": str(log_path), "rsync": {"command": cmd, "exit_code": None}, } ) - result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check) + def process_started(pid: int, pgid: int) -> None: + if state_callback is None: + return + state_callback( + { + "status": "running", + "phase": "rsync", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": None, "pid": pid, "pgid": pgid}, + } + ) + + result = run_rsync( + cmd, + log_path=log_path, + timeout_seconds=timeout_seconds, + cancel_check=cancel_check, + process_started=process_started if state_callback is not None else None, + ) log_tail = _read_log_tail(log_path) warning = classify_rsync_warning(result.exit_code, log_tail) successful_or_warning = result.exit_code == 0 or warning is not None + if state_callback is not None: + state_callback( + { + "status": "running", + "phase": "finalizing", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": result.exit_code, "log_tail": log_tail}, + } + ) end_ts = utc_now() meta["ended_at"] = format_iso_z(end_ts) diff --git a/src/pobsync/rsync.py b/src/pobsync/rsync.py index 2f0acb0..32fa2f6 100644 --- a/src/pobsync/rsync.py +++ b/src/pobsync/rsync.py @@ -82,6 +82,7 @@ def run_rsync( log_path: Path, timeout_seconds: int, cancel_check: Callable[[], bool] | None = None, + process_started: Callable[[int, int], None] | None = None, ) -> RsyncResult: """ Run rsync and always write stdout/stderr to log_path. @@ -95,6 +96,8 @@ def run_rsync( with log_path.open("ab") as f: process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, start_new_session=True) + if process_started is not None: + process_started(process.pid, os.getpgid(process.pid)) started = time.monotonic() while True: exit_code = process.poll() diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index dece829..8b7f9ec 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -225,6 +225,9 @@ def _record_running_state(run_id: int, state: dict[str, object]) -> None: log_path = state.get("log") snapshot_path = state.get("snapshot") + phase = state.get("phase") + if isinstance(phase, str) and phase: + execution["phase"] = phase if isinstance(log_path, str) and log_path: execution["log"] = log_path if isinstance(snapshot_path, str) and snapshot_path: @@ -275,6 +278,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s run.result = result run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) return True + if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds): + result.update( + { + "ok": False, + "host": run.host.host, + "log": str(log_path) if log_path else "", + "failure": { + "category": "rsync_process", + "message": "The rsync process is no longer running while the backup is still marked running.", + "hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.", + }, + "rsync": { + **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), + "exit_code": exit_code or 255, + "log_tail": log_tail, + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.rsync_exit_code = exit_code or 255 + run.result = result + run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) + return True if stale_worker: result.update( { @@ -413,6 +440,33 @@ def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> b return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds) +def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool: + if grace_seconds <= 0: + return False + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + if execution.get("phase") != "rsync": + return False + rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} + pid = rsync.get("pid") + if not isinstance(pid, int) or pid <= 0: + return False + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at + if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds): + return False + return not _process_exists(pid) + + +def _process_exists(pid: int) -> bool: + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + def _parse_iso_datetime(value: object): if not isinstance(value, str) or not value: return None diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 71536bb..084fc53 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -160,16 +160,19 @@ class BackupWorkerTests(TestCase): kwargs["state_callback"]( { "status": "running", + "phase": "rsync", "snapshot": str(snapshot_dir), "log": str(log_path), - "rsync": {"command": ["rsync"], "exit_code": None}, + "rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234}, } ) run.refresh_from_db() self.assertEqual(run.snapshot_path, str(snapshot_dir)) + self.assertEqual(run.result["execution"]["phase"], "rsync") self.assertEqual(run.result["execution"]["log"], str(log_path)) self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir)) self.assertEqual(run.result["rsync"]["command"], ["rsync"]) + self.assertEqual(run.result["rsync"]["pid"], 1234) return { "ok": True, "dry_run": False, @@ -228,6 +231,49 @@ class BackupWorkerTests(TestCase): self.assertEqual(run.result["failure"]["category"], "transport") self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) + def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text("sending incremental file list\n", encoding="utf-8") + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(minutes=10) + run.result["execution"] = { + "phase": "rsync", + "log": str(log_path), + "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), + } + run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.result["failure"]["category"], "rsync_process") + self.assertEqual(run.rsync_exit_code, 255) + + def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(minutes=10) + run.result["execution"] = { + "phase": "finalizing", + "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), + } + run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) + + self.assertEqual(reconciled, 0) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.RUNNING) + def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None: with TemporaryDirectory() as tmp: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") diff --git a/src/pobsync_backend/tests/test_run_scheduled_config_source.py b/src/pobsync_backend/tests/test_run_scheduled_config_source.py index 0285143..117c91d 100644 --- a/src/pobsync_backend/tests/test_run_scheduled_config_source.py +++ b/src/pobsync_backend/tests/test_run_scheduled_config_source.py @@ -259,11 +259,18 @@ class RunScheduledConfigSourceTests(SimpleTestCase): def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None: states = [] - def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None, process_started=None): self.assertEqual(len(states), 1) self.assertEqual(states[0]["status"], "running") + self.assertEqual(states[0]["phase"], "preparing") self.assertEqual(states[0]["log"], str(log_path)) self.assertEqual(states[0]["rsync"]["command"], command) + self.assertIsNotNone(process_started) + process_started(1234, 1234) + self.assertEqual(len(states), 2) + self.assertEqual(states[1]["phase"], "rsync") + self.assertEqual(states[1]["rsync"]["pid"], 1234) + self.assertEqual(states[1]["rsync"]["pgid"], 1234) log_path.write_text("Number of files: 1\n", encoding="utf-8") return RsyncResult(exit_code=0, command=command) @@ -277,8 +284,10 @@ class RunScheduledConfigSourceTests(SimpleTestCase): state_callback=states.append, ) - self.assertEqual(len(states), 1) + self.assertEqual(len(states), 3) self.assertIn("/.incomplete/", states[0]["snapshot"]) + self.assertEqual(states[2]["phase"], "finalizing") + self.assertEqual(states[2]["rsync"]["exit_code"], 0) def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None: def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):