diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 665f16c..2268ed5 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -346,16 +346,46 @@ def run_scheduled( state_callback( { "status": "running", + "phase": "preparing", "snapshot": str(incomplete_dir), "log": str(log_path), "rsync": {"command": cmd, "exit_code": None}, } ) - result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check) + def process_started(pid: int, pgid: int) -> None: + if state_callback is None: + return + state_callback( + { + "status": "running", + "phase": "rsync", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": None, "pid": pid, "pgid": pgid}, + } + ) + + result = run_rsync( + cmd, + log_path=log_path, + timeout_seconds=timeout_seconds, + cancel_check=cancel_check, + process_started=process_started if state_callback is not None else None, + ) log_tail = _read_log_tail(log_path) warning = classify_rsync_warning(result.exit_code, log_tail) successful_or_warning = result.exit_code == 0 or warning is not None + if state_callback is not None: + state_callback( + { + "status": "running", + "phase": "finalizing", + "snapshot": str(incomplete_dir), + "log": str(log_path), + "rsync": {"command": cmd, "exit_code": result.exit_code, "log_tail": log_tail}, + } + ) end_ts = utc_now() meta["ended_at"] = format_iso_z(end_ts) diff --git a/src/pobsync/rsync.py b/src/pobsync/rsync.py index 2f0acb0..32fa2f6 100644 --- a/src/pobsync/rsync.py +++ b/src/pobsync/rsync.py @@ -82,6 +82,7 @@ def run_rsync( log_path: Path, timeout_seconds: int, cancel_check: Callable[[], bool] | None = None, + process_started: Callable[[int, int], None] | None = None, ) -> RsyncResult: """ Run rsync and always write stdout/stderr to log_path. @@ -95,6 +96,8 @@ def run_rsync( with log_path.open("ab") as f: process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, start_new_session=True) + if process_started is not None: + process_started(process.pid, os.getpgid(process.pid)) started = time.monotonic() while True: exit_code = process.poll() diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index dece829..8b7f9ec 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -225,6 +225,9 @@ def _record_running_state(run_id: int, state: dict[str, object]) -> None: log_path = state.get("log") snapshot_path = state.get("snapshot") + phase = state.get("phase") + if isinstance(phase, str) and phase: + execution["phase"] = phase if isinstance(log_path, str) and log_path: execution["log"] = log_path if isinstance(snapshot_path, str) and snapshot_path: @@ -275,6 +278,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s run.result = result run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) return True + if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds): + result.update( + { + "ok": False, + "host": run.host.host, + "log": str(log_path) if log_path else "", + "failure": { + "category": "rsync_process", + "message": "The rsync process is no longer running while the backup is still marked running.", + "hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.", + }, + "rsync": { + **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), + "exit_code": exit_code or 255, + "log_tail": log_tail, + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.rsync_exit_code = exit_code or 255 + run.result = result + run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) + return True if stale_worker: result.update( { @@ -413,6 +440,33 @@ def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> b return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds) +def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool: + if grace_seconds <= 0: + return False + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + if execution.get("phase") != "rsync": + return False + rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} + pid = rsync.get("pid") + if not isinstance(pid, int) or pid <= 0: + return False + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at + if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds): + return False + return not _process_exists(pid) + + +def _process_exists(pid: int) -> bool: + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + def _parse_iso_datetime(value: object): if not isinstance(value, str) or not value: return None diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 71536bb..084fc53 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -160,16 +160,19 @@ class BackupWorkerTests(TestCase): kwargs["state_callback"]( { "status": "running", + "phase": "rsync", "snapshot": str(snapshot_dir), "log": str(log_path), - "rsync": {"command": ["rsync"], "exit_code": None}, + "rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234}, } ) run.refresh_from_db() self.assertEqual(run.snapshot_path, str(snapshot_dir)) + self.assertEqual(run.result["execution"]["phase"], "rsync") self.assertEqual(run.result["execution"]["log"], str(log_path)) self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir)) self.assertEqual(run.result["rsync"]["command"], ["rsync"]) + self.assertEqual(run.result["rsync"]["pid"], 1234) return { "ok": True, "dry_run": False, @@ -228,6 +231,49 @@ class BackupWorkerTests(TestCase): self.assertEqual(run.result["failure"]["category"], "transport") self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) + def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None: + with TemporaryDirectory() as tmp: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log" + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text("sending incremental file list\n", encoding="utf-8") + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(minutes=10) + run.result["execution"] = { + "phase": "rsync", + "log": str(log_path), + "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), + } + run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.result["failure"]["category"], "rsync_process") + self.assertEqual(run.rsync_exit_code, 255) + + def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(minutes=10) + run.result["execution"] = { + "phase": "finalizing", + "heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(), + } + run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60) + + self.assertEqual(reconciled, 0) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.RUNNING) + def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None: with TemporaryDirectory() as tmp: host = HostConfig.objects.create(host="web-01", address="web-01.example.test") diff --git a/src/pobsync_backend/tests/test_run_scheduled_config_source.py b/src/pobsync_backend/tests/test_run_scheduled_config_source.py index 0285143..117c91d 100644 --- a/src/pobsync_backend/tests/test_run_scheduled_config_source.py +++ b/src/pobsync_backend/tests/test_run_scheduled_config_source.py @@ -259,11 +259,18 @@ class RunScheduledConfigSourceTests(SimpleTestCase): def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None: states = [] - def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None, process_started=None): self.assertEqual(len(states), 1) self.assertEqual(states[0]["status"], "running") + self.assertEqual(states[0]["phase"], "preparing") self.assertEqual(states[0]["log"], str(log_path)) self.assertEqual(states[0]["rsync"]["command"], command) + self.assertIsNotNone(process_started) + process_started(1234, 1234) + self.assertEqual(len(states), 2) + self.assertEqual(states[1]["phase"], "rsync") + self.assertEqual(states[1]["rsync"]["pid"], 1234) + self.assertEqual(states[1]["rsync"]["pgid"], 1234) log_path.write_text("Number of files: 1\n", encoding="utf-8") return RsyncResult(exit_code=0, command=command) @@ -277,8 +284,10 @@ class RunScheduledConfigSourceTests(SimpleTestCase): state_callback=states.append, ) - self.assertEqual(len(states), 1) + self.assertEqual(len(states), 3) self.assertIn("/.incomplete/", states[0]["snapshot"]) + self.assertEqual(states[2]["phase"], "finalizing") + self.assertEqual(states[2]["rsync"]["exit_code"], 0) def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None: def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):