(release) Track worker heartbeat for running jobs

Record worker pid, host, claim time, and heartbeat metadata on running
backup jobs so operators can see which worker owns a run.

Refresh the heartbeat while rsync is active and reconcile stale running
runs when the worker heartbeat stops. Add a worker option to tune or
disable stale-run reconciliation.

Refs #11
This commit is contained in:
2026-05-21 03:16:38 +02:00
parent 404b7f7500
commit 4c8ed24561
6 changed files with 184 additions and 10 deletions

View File

@@ -1,6 +1,8 @@
from __future__ import annotations from __future__ import annotations
from datetime import timedelta import os
import socket
from datetime import timedelta, timezone as datetime_timezone
from pathlib import Path from pathlib import Path
from django.db import transaction from django.db import transaction
@@ -158,10 +160,10 @@ def claim_next_queued_run() -> BackupRun | None:
return run return run
def reconcile_running_runs(*, grace_seconds: int = 300) -> int: def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int:
reconciled = 0 reconciled = 0
for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"): for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"):
if _reconcile_running_run(run=run, grace_seconds=grace_seconds): if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds):
reconciled += 1 reconciled += 1
return reconciled return reconciled
@@ -176,7 +178,9 @@ def requested_options(run: BackupRun) -> dict[str, object]:
def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
result = dict(run.result) if isinstance(run.result, dict) else {} result = dict(run.result) if isinstance(run.result, dict) else {}
execution = { execution = {
**_worker_execution_details(),
"started_at": (run.started_at or timezone.now()).isoformat(), "started_at": (run.started_at or timezone.now()).isoformat(),
"heartbeat_at": timezone.now().isoformat(),
} }
if dry_run: if dry_run:
execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id)) execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id))
@@ -185,24 +189,56 @@ def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
def _run_cancel_requested(run_id: int) -> bool: def _run_cancel_requested(run_id: int) -> bool:
return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists() try:
run = BackupRun.objects.only("id", "status", "result").get(id=run_id)
except BackupRun.DoesNotExist:
return True
if run.status == BackupRun.Status.CANCELLED:
return True
if run.status == BackupRun.Status.RUNNING:
_refresh_run_heartbeat(run)
return False
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool: def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool:
result = run.result if isinstance(run.result, dict) else {} result = run.result if isinstance(run.result, dict) else {}
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds)
if not requested.get("dry_run"): if not requested.get("dry_run"):
if stale_worker:
result.update(
{
"ok": False,
"host": run.host.host,
"failure": {
"category": "worker",
"message": "The worker heartbeat stopped before the run finished.",
"hint": "Check pobsync-worker.service logs before retrying the backup.",
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.result = result
run.save(update_fields=["status", "ended_at", "result"])
return True
return False return False
log_path = _execution_log_path(result) log_path = _execution_log_path(result)
log_tail = _read_log_tail(log_path) if log_path is not None else [] log_tail = _read_log_tail(log_path) if log_path is not None else []
terminal_log = _terminal_rsync_log(log_tail) terminal_log = _terminal_rsync_log(log_tail)
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
if not terminal_log and not timed_out: if not terminal_log and not timed_out and not stale_worker:
return False return False
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out else 255) exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255)
failure = classify_rsync_failure(exit_code, log_tail) failure = classify_rsync_failure(exit_code, log_tail)
if stale_worker and not terminal_log:
failure = {
"category": "worker",
"message": "The worker heartbeat stopped before the dry-run finished.",
"hint": "Check pobsync-worker.service logs before retrying the dry-run.",
}
result.update( result.update(
{ {
"ok": False, "ok": False,
@@ -226,6 +262,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool:
return True return True
def _worker_execution_details() -> dict[str, object]:
return {
"worker_pid": os.getpid(),
"worker_host": socket.gethostname(),
"claimed_at": timezone.now().isoformat(),
}
def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None:
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds):
return
result["execution"] = {
**execution,
"worker_pid": os.getpid(),
"worker_host": socket.gethostname(),
"heartbeat_at": timezone.now().isoformat(),
}
run.result = result
run.save(update_fields=["result"])
def _execution_log_path(result: dict[str, object]) -> Path | None: def _execution_log_path(result: dict[str, object]) -> Path | None:
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
log = execution.get("log") or result.get("log") log = execution.get("log") or result.get("log")
@@ -266,3 +326,28 @@ def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool:
if not isinstance(timeout_seconds, int) or timeout_seconds <= 0: if not isinstance(timeout_seconds, int) or timeout_seconds <= 0:
timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS
return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds) return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds)
def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool:
if stale_worker_seconds <= 0:
return False
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
if heartbeat_at is None:
heartbeat_at = run.started_at
if heartbeat_at is None:
return False
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
def _parse_iso_datetime(value: object):
if not isinstance(value, str) or not value:
return None
try:
parsed = timezone.datetime.fromisoformat(value)
except ValueError:
return None
if timezone.is_naive(parsed):
return timezone.make_aware(parsed, timezone=datetime_timezone.utc)
return parsed

View File

@@ -19,6 +19,12 @@ class Command(BaseCommand):
parser.add_argument("--once", action="store_true", help="Process one queued run and exit") parser.add_argument("--once", action="store_true", help="Process one queued run and exit")
parser.add_argument("--loop", action="store_true", help="Keep checking for queued runs") parser.add_argument("--loop", action="store_true", help="Keep checking for queued runs")
parser.add_argument("--interval", type=int, default=15, help="Loop interval in seconds") parser.add_argument("--interval", type=int, default=15, help="Loop interval in seconds")
parser.add_argument(
"--stale-running-seconds",
type=int,
default=24 * 60 * 60,
help="Mark running runs failed after this many seconds without a worker heartbeat; use 0 to disable",
)
def handle(self, *args: Any, **options: Any) -> None: def handle(self, *args: Any, **options: Any) -> None:
if not options["once"] and not options["loop"]: if not options["once"] and not options["loop"]:
@@ -26,14 +32,14 @@ class Command(BaseCommand):
paths = PobsyncPaths(home=Path(options["prefix"])) paths = PobsyncPaths(home=Path(options["prefix"]))
while True: while True:
count = self._run_once(prefix=paths.home) count = self._run_once(prefix=paths.home, stale_running_seconds=int(options["stale_running_seconds"]))
self.stdout.write(f"Ran {count} queued backup run(s).") self.stdout.write(f"Ran {count} queued backup run(s).")
if options["once"]: if options["once"]:
return return
time.sleep(max(1, int(options["interval"]))) time.sleep(max(1, int(options["interval"])))
def _run_once(self, *, prefix: Path) -> int: def _run_once(self, *, prefix: Path, stale_running_seconds: int = 24 * 60 * 60) -> int:
reconciled = reconcile_running_runs() reconciled = reconcile_running_runs(stale_worker_seconds=stale_running_seconds)
run = claim_next_queued_run() run = claim_next_queued_run()
if run is None: if run is None:
return reconciled return reconciled

View File

@@ -79,6 +79,10 @@
<div><strong>Created:</strong> {{ run.created_at }}</div> <div><strong>Created:</strong> {{ run.created_at }}</div>
<div><strong>Started:</strong> {{ run.started_at|default:"" }}</div> <div><strong>Started:</strong> {{ run.started_at|default:"" }}</div>
<div><strong>Ended:</strong> {{ run.ended_at|default:"" }}</div> <div><strong>Ended:</strong> {{ run.ended_at|default:"" }}</div>
{% if execution %}
<div><strong>Worker:</strong> {{ execution.worker_host|default:"unknown" }}{% if execution.worker_pid %} pid {{ execution.worker_pid }}{% endif %}</div>
<div><strong>Worker heartbeat:</strong> {{ execution.heartbeat_at|default:"" }}</div>
{% endif %}
</div> </div>
</section> </section>

View File

@@ -61,6 +61,9 @@ class BackupWorkerTests(TestCase):
def fake_run_scheduled(**kwargs): def fake_run_scheduled(**kwargs):
run.refresh_from_db() run.refresh_from_db()
self.assertIn("execution", run.result) self.assertIn("execution", run.result)
self.assertIn("worker_pid", run.result["execution"])
self.assertIn("worker_host", run.result["execution"])
self.assertIn("heartbeat_at", run.result["execution"])
return { return {
"ok": True, "ok": True,
"dry_run": False, "dry_run": False,
@@ -82,6 +85,57 @@ class BackupWorkerTests(TestCase):
self.assertEqual(SnapshotRecord.objects.count(), 1) self.assertEqual(SnapshotRecord.objects.count(), 1)
self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) self.assertEqual(run.snapshot, SnapshotRecord.objects.get())
def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None:
with TemporaryDirectory() as tmp:
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
def fake_run_scheduled(**kwargs):
run.refresh_from_db()
old_heartbeat = timezone.now() - timedelta(seconds=120)
run.result["execution"]["heartbeat_at"] = old_heartbeat.isoformat()
run.save(update_fields=["result"])
self.assertFalse(kwargs["cancel_check"]())
run.refresh_from_db()
self.assertGreater(
timezone.datetime.fromisoformat(run.result["execution"]["heartbeat_at"]),
old_heartbeat,
)
return {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": "",
"base": None,
"rsync": {"exit_code": 0},
}
run_scheduled.side_effect = fake_run_scheduled
Command()._run_once(prefix=Path(tmp) / "home")
def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now() - timedelta(seconds=120)
run.result["execution"] = {
"worker_pid": 123,
"worker_host": "backup",
"heartbeat_at": (timezone.now() - timedelta(seconds=90)).isoformat(),
}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs(stale_worker_seconds=30)
self.assertEqual(reconciled, 1)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.FAILED)
self.assertEqual(run.result["failure"]["category"], "worker")
self.assertIn("heartbeat stopped", run.result["failure"]["message"])
def test_worker_records_dry_run_log_path_while_running(self) -> None: def test_worker_records_dry_run_log_path_while_running(self) -> None:
with TemporaryDirectory() as tmp: with TemporaryDirectory() as tmp:
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))

View File

@@ -1373,6 +1373,29 @@ class ViewTests(TestCase):
self.assertContains(response, "Cancel run") self.assertContains(response, "Cancel run")
self.assertContains(response, reverse("cancel_run", args=[run.id])) self.assertContains(response, reverse("cancel_run", args=[run.id]))
def test_run_detail_renders_worker_execution_metadata(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.RUNNING,
result={
"execution": {
"worker_host": "backup-01",
"worker_pid": 4242,
"heartbeat_at": "2026-05-21T10:30:00+00:00",
}
},
)
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Worker:")
self.assertContains(response, "backup-01")
self.assertContains(response, "pid 4242")
self.assertContains(response, "Worker heartbeat:")
def test_cancel_run_marks_queued_run_cancelled(self) -> None: def test_cancel_run_marks_queued_run_cancelled(self) -> None:
self.client.force_login(self.staff_user) self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test") host = HostConfig.objects.create(host="web-01", address="web-01.example.test")

View File

@@ -448,6 +448,7 @@ def run_detail(request, run_id: int):
rsync_result = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} rsync_result = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
failure = result.get("failure") if isinstance(result.get("failure"), dict) else {} failure = result.get("failure") if isinstance(result.get("failure"), dict) else {}
prune_result = result.get("prune") if isinstance(result.get("prune"), dict) else {} prune_result = result.get("prune") if isinstance(result.get("prune"), dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
rsync_log_path = _run_rsync_log_path(run) rsync_log_path = _run_rsync_log_path(run)
rsync_log_tail = _run_rsync_log_tail(rsync_result, rsync_log_path) rsync_log_tail = _run_rsync_log_tail(rsync_result, rsync_log_path)
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
@@ -455,6 +456,7 @@ def run_detail(request, run_id: int):
"run": run, "run": run,
"can_cancel": run.status in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}, "can_cancel": run.status in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING},
"requested": requested, "requested": requested,
"execution": execution,
"stats": run_stats if isinstance(run_stats, dict) else {}, "stats": run_stats if isinstance(run_stats, dict) else {},
"rsync": rsync_result, "rsync": rsync_result,
"rsync_command": _run_rsync_command(rsync_result), "rsync_command": _run_rsync_command(rsync_result),