From 4c8ed24561c895fc162c41bc8bb81c0adc6049c3 Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Thu, 21 May 2026 03:16:38 +0200 Subject: [PATCH] (release) Track worker heartbeat for running jobs Record worker pid, host, claim time, and heartbeat metadata on running backup jobs so operators can see which worker owns a run. Refresh the heartbeat while rsync is active and reconcile stale running runs when the worker heartbeat stops. Add a worker option to tune or disable stale-run reconciliation. Refs #11 --- src/pobsync_backend/backup_runner.py | 99 +++++++++++++++++-- .../management/commands/run_pobsync_worker.py | 12 ++- .../templates/pobsync_backend/run_detail.html | 4 + .../tests/test_backup_worker.py | 54 ++++++++++ src/pobsync_backend/tests/test_views.py | 23 +++++ src/pobsync_backend/views.py | 2 + 6 files changed, 184 insertions(+), 10 deletions(-) diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index 68bbb6f..d75c07c 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -1,6 +1,8 @@ from __future__ import annotations -from datetime import timedelta +import os +import socket +from datetime import timedelta, timezone as datetime_timezone from pathlib import Path from django.db import transaction @@ -158,10 +160,10 @@ def claim_next_queued_run() -> BackupRun | None: return run -def reconcile_running_runs(*, grace_seconds: int = 300) -> int: +def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int: reconciled = 0 for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"): - if _reconcile_running_run(run=run, grace_seconds=grace_seconds): + if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds): reconciled += 1 return reconciled @@ -176,7 +178,9 @@ def requested_options(run: BackupRun) -> dict[str, object]: def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: result = dict(run.result) if isinstance(run.result, dict) else {} execution = { + **_worker_execution_details(), "started_at": (run.started_at or timezone.now()).isoformat(), + "heartbeat_at": timezone.now().isoformat(), } if dry_run: execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id)) @@ -185,24 +189,56 @@ def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: def _run_cancel_requested(run_id: int) -> bool: - return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists() + try: + run = BackupRun.objects.only("id", "status", "result").get(id=run_id) + except BackupRun.DoesNotExist: + return True + if run.status == BackupRun.Status.CANCELLED: + return True + if run.status == BackupRun.Status.RUNNING: + _refresh_run_heartbeat(run) + return False -def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool: +def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool: result = run.result if isinstance(run.result, dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} + stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds) if not requested.get("dry_run"): + if stale_worker: + result.update( + { + "ok": False, + "host": run.host.host, + "failure": { + "category": "worker", + "message": "The worker heartbeat stopped before the run finished.", + "hint": "Check pobsync-worker.service logs before retrying the backup.", + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.result = result + run.save(update_fields=["status", "ended_at", "result"]) + return True return False log_path = _execution_log_path(result) log_tail = _read_log_tail(log_path) if log_path is not None else [] terminal_log = _terminal_rsync_log(log_tail) timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) - if not terminal_log and not timed_out: + if not terminal_log and not timed_out and not stale_worker: return False - exit_code = _exit_code_from_log(log_tail) or (124 if timed_out else 255) + exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255) failure = classify_rsync_failure(exit_code, log_tail) + if stale_worker and not terminal_log: + failure = { + "category": "worker", + "message": "The worker heartbeat stopped before the dry-run finished.", + "hint": "Check pobsync-worker.service logs before retrying the dry-run.", + } result.update( { "ok": False, @@ -226,6 +262,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool: return True +def _worker_execution_details() -> dict[str, object]: + return { + "worker_pid": os.getpid(), + "worker_host": socket.gethostname(), + "claimed_at": timezone.now().isoformat(), + } + + +def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None: + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) + if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds): + return + result["execution"] = { + **execution, + "worker_pid": os.getpid(), + "worker_host": socket.gethostname(), + "heartbeat_at": timezone.now().isoformat(), + } + run.result = result + run.save(update_fields=["result"]) + + def _execution_log_path(result: dict[str, object]) -> Path | None: execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} log = execution.get("log") or result.get("log") @@ -266,3 +326,28 @@ def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool: if not isinstance(timeout_seconds, int) or timeout_seconds <= 0: timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds) + + +def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool: + if stale_worker_seconds <= 0: + return False + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) + if heartbeat_at is None: + heartbeat_at = run.started_at + if heartbeat_at is None: + return False + return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds) + + +def _parse_iso_datetime(value: object): + if not isinstance(value, str) or not value: + return None + try: + parsed = timezone.datetime.fromisoformat(value) + except ValueError: + return None + if timezone.is_naive(parsed): + return timezone.make_aware(parsed, timezone=datetime_timezone.utc) + return parsed diff --git a/src/pobsync_backend/management/commands/run_pobsync_worker.py b/src/pobsync_backend/management/commands/run_pobsync_worker.py index 1b3b200..b44ff91 100644 --- a/src/pobsync_backend/management/commands/run_pobsync_worker.py +++ b/src/pobsync_backend/management/commands/run_pobsync_worker.py @@ -19,6 +19,12 @@ class Command(BaseCommand): parser.add_argument("--once", action="store_true", help="Process one queued run and exit") parser.add_argument("--loop", action="store_true", help="Keep checking for queued runs") parser.add_argument("--interval", type=int, default=15, help="Loop interval in seconds") + parser.add_argument( + "--stale-running-seconds", + type=int, + default=24 * 60 * 60, + help="Mark running runs failed after this many seconds without a worker heartbeat; use 0 to disable", + ) def handle(self, *args: Any, **options: Any) -> None: if not options["once"] and not options["loop"]: @@ -26,14 +32,14 @@ class Command(BaseCommand): paths = PobsyncPaths(home=Path(options["prefix"])) while True: - count = self._run_once(prefix=paths.home) + count = self._run_once(prefix=paths.home, stale_running_seconds=int(options["stale_running_seconds"])) self.stdout.write(f"Ran {count} queued backup run(s).") if options["once"]: return time.sleep(max(1, int(options["interval"]))) - def _run_once(self, *, prefix: Path) -> int: - reconciled = reconcile_running_runs() + def _run_once(self, *, prefix: Path, stale_running_seconds: int = 24 * 60 * 60) -> int: + reconciled = reconcile_running_runs(stale_worker_seconds=stale_running_seconds) run = claim_next_queued_run() if run is None: return reconciled diff --git a/src/pobsync_backend/templates/pobsync_backend/run_detail.html b/src/pobsync_backend/templates/pobsync_backend/run_detail.html index 6d82121..7526e33 100644 --- a/src/pobsync_backend/templates/pobsync_backend/run_detail.html +++ b/src/pobsync_backend/templates/pobsync_backend/run_detail.html @@ -79,6 +79,10 @@
Created: {{ run.created_at }}
Started: {{ run.started_at|default:"" }}
Ended: {{ run.ended_at|default:"" }}
+ {% if execution %} +
Worker: {{ execution.worker_host|default:"unknown" }}{% if execution.worker_pid %} pid {{ execution.worker_pid }}{% endif %}
+
Worker heartbeat: {{ execution.heartbeat_at|default:"" }}
+ {% endif %} diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 1f7175c..99db08d 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -61,6 +61,9 @@ class BackupWorkerTests(TestCase): def fake_run_scheduled(**kwargs): run.refresh_from_db() self.assertIn("execution", run.result) + self.assertIn("worker_pid", run.result["execution"]) + self.assertIn("worker_host", run.result["execution"]) + self.assertIn("heartbeat_at", run.result["execution"]) return { "ok": True, "dry_run": False, @@ -82,6 +85,57 @@ class BackupWorkerTests(TestCase): self.assertEqual(SnapshotRecord.objects.count(), 1) self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) + def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None: + with TemporaryDirectory() as tmp: + GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + def fake_run_scheduled(**kwargs): + run.refresh_from_db() + old_heartbeat = timezone.now() - timedelta(seconds=120) + run.result["execution"]["heartbeat_at"] = old_heartbeat.isoformat() + run.save(update_fields=["result"]) + + self.assertFalse(kwargs["cancel_check"]()) + run.refresh_from_db() + self.assertGreater( + timezone.datetime.fromisoformat(run.result["execution"]["heartbeat_at"]), + old_heartbeat, + ) + return { + "ok": True, + "dry_run": False, + "host": host.host, + "snapshot": "", + "base": None, + "rsync": {"exit_code": 0}, + } + + run_scheduled.side_effect = fake_run_scheduled + Command()._run_once(prefix=Path(tmp) / "home") + + def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(seconds=120) + run.result["execution"] = { + "worker_pid": 123, + "worker_host": "backup", + "heartbeat_at": (timezone.now() - timedelta(seconds=90)).isoformat(), + } + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(stale_worker_seconds=30) + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.result["failure"]["category"], "worker") + self.assertIn("heartbeat stopped", run.result["failure"]["message"]) + def test_worker_records_dry_run_log_path_while_running(self) -> None: with TemporaryDirectory() as tmp: GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) diff --git a/src/pobsync_backend/tests/test_views.py b/src/pobsync_backend/tests/test_views.py index f648061..3e59f30 100644 --- a/src/pobsync_backend/tests/test_views.py +++ b/src/pobsync_backend/tests/test_views.py @@ -1373,6 +1373,29 @@ class ViewTests(TestCase): self.assertContains(response, "Cancel run") self.assertContains(response, reverse("cancel_run", args=[run.id])) + def test_run_detail_renders_worker_execution_metadata(self) -> None: + self.client.force_login(self.staff_user) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = BackupRun.objects.create( + host=host, + status=BackupRun.Status.RUNNING, + result={ + "execution": { + "worker_host": "backup-01", + "worker_pid": 4242, + "heartbeat_at": "2026-05-21T10:30:00+00:00", + } + }, + ) + + response = self.client.get(reverse("run_detail", args=[run.id])) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Worker:") + self.assertContains(response, "backup-01") + self.assertContains(response, "pid 4242") + self.assertContains(response, "Worker heartbeat:") + def test_cancel_run_marks_queued_run_cancelled(self) -> None: self.client.force_login(self.staff_user) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") diff --git a/src/pobsync_backend/views.py b/src/pobsync_backend/views.py index 2d22176..7561287 100644 --- a/src/pobsync_backend/views.py +++ b/src/pobsync_backend/views.py @@ -448,6 +448,7 @@ def run_detail(request, run_id: int): rsync_result = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} failure = result.get("failure") if isinstance(result.get("failure"), dict) else {} prune_result = result.get("prune") if isinstance(result.get("prune"), dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} rsync_log_path = _run_rsync_log_path(run) rsync_log_tail = _run_rsync_log_tail(rsync_result, rsync_log_path) requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} @@ -455,6 +456,7 @@ def run_detail(request, run_id: int): "run": run, "can_cancel": run.status in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}, "requested": requested, + "execution": execution, "stats": run_stats if isinstance(run_stats, dict) else {}, "rsync": rsync_result, "rsync_command": _run_rsync_command(rsync_result),