diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index 68bbb6f..d75c07c 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -1,6 +1,8 @@ from __future__ import annotations -from datetime import timedelta +import os +import socket +from datetime import timedelta, timezone as datetime_timezone from pathlib import Path from django.db import transaction @@ -158,10 +160,10 @@ def claim_next_queued_run() -> BackupRun | None: return run -def reconcile_running_runs(*, grace_seconds: int = 300) -> int: +def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int: reconciled = 0 for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"): - if _reconcile_running_run(run=run, grace_seconds=grace_seconds): + if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds): reconciled += 1 return reconciled @@ -176,7 +178,9 @@ def requested_options(run: BackupRun) -> dict[str, object]: def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: result = dict(run.result) if isinstance(run.result, dict) else {} execution = { + **_worker_execution_details(), "started_at": (run.started_at or timezone.now()).isoformat(), + "heartbeat_at": timezone.now().isoformat(), } if dry_run: execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id)) @@ -185,24 +189,56 @@ def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: def _run_cancel_requested(run_id: int) -> bool: - return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists() + try: + run = BackupRun.objects.only("id", "status", "result").get(id=run_id) + except BackupRun.DoesNotExist: + return True + if run.status == BackupRun.Status.CANCELLED: + return True + if run.status == BackupRun.Status.RUNNING: + _refresh_run_heartbeat(run) + return False -def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool: +def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool: result = run.result if isinstance(run.result, dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} + stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds) if not requested.get("dry_run"): + if stale_worker: + result.update( + { + "ok": False, + "host": run.host.host, + "failure": { + "category": "worker", + "message": "The worker heartbeat stopped before the run finished.", + "hint": "Check pobsync-worker.service logs before retrying the backup.", + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.result = result + run.save(update_fields=["status", "ended_at", "result"]) + return True return False log_path = _execution_log_path(result) log_tail = _read_log_tail(log_path) if log_path is not None else [] terminal_log = _terminal_rsync_log(log_tail) timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) - if not terminal_log and not timed_out: + if not terminal_log and not timed_out and not stale_worker: return False - exit_code = _exit_code_from_log(log_tail) or (124 if timed_out else 255) + exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255) failure = classify_rsync_failure(exit_code, log_tail) + if stale_worker and not terminal_log: + failure = { + "category": "worker", + "message": "The worker heartbeat stopped before the dry-run finished.", + "hint": "Check pobsync-worker.service logs before retrying the dry-run.", + } result.update( { "ok": False, @@ -226,6 +262,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool: return True +def _worker_execution_details() -> dict[str, object]: + return { + "worker_pid": os.getpid(), + "worker_host": socket.gethostname(), + "claimed_at": timezone.now().isoformat(), + } + + +def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None: + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) + if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds): + return + result["execution"] = { + **execution, + "worker_pid": os.getpid(), + "worker_host": socket.gethostname(), + "heartbeat_at": timezone.now().isoformat(), + } + run.result = result + run.save(update_fields=["result"]) + + def _execution_log_path(result: dict[str, object]) -> Path | None: execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} log = execution.get("log") or result.get("log") @@ -266,3 +326,28 @@ def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool: if not isinstance(timeout_seconds, int) or timeout_seconds <= 0: timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds) + + +def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool: + if stale_worker_seconds <= 0: + return False + result = run.result if isinstance(run.result, dict) else {} + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) + if heartbeat_at is None: + heartbeat_at = run.started_at + if heartbeat_at is None: + return False + return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds) + + +def _parse_iso_datetime(value: object): + if not isinstance(value, str) or not value: + return None + try: + parsed = timezone.datetime.fromisoformat(value) + except ValueError: + return None + if timezone.is_naive(parsed): + return timezone.make_aware(parsed, timezone=datetime_timezone.utc) + return parsed diff --git a/src/pobsync_backend/management/commands/run_pobsync_worker.py b/src/pobsync_backend/management/commands/run_pobsync_worker.py index 1b3b200..b44ff91 100644 --- a/src/pobsync_backend/management/commands/run_pobsync_worker.py +++ b/src/pobsync_backend/management/commands/run_pobsync_worker.py @@ -19,6 +19,12 @@ class Command(BaseCommand): parser.add_argument("--once", action="store_true", help="Process one queued run and exit") parser.add_argument("--loop", action="store_true", help="Keep checking for queued runs") parser.add_argument("--interval", type=int, default=15, help="Loop interval in seconds") + parser.add_argument( + "--stale-running-seconds", + type=int, + default=24 * 60 * 60, + help="Mark running runs failed after this many seconds without a worker heartbeat; use 0 to disable", + ) def handle(self, *args: Any, **options: Any) -> None: if not options["once"] and not options["loop"]: @@ -26,14 +32,14 @@ class Command(BaseCommand): paths = PobsyncPaths(home=Path(options["prefix"])) while True: - count = self._run_once(prefix=paths.home) + count = self._run_once(prefix=paths.home, stale_running_seconds=int(options["stale_running_seconds"])) self.stdout.write(f"Ran {count} queued backup run(s).") if options["once"]: return time.sleep(max(1, int(options["interval"]))) - def _run_once(self, *, prefix: Path) -> int: - reconciled = reconcile_running_runs() + def _run_once(self, *, prefix: Path, stale_running_seconds: int = 24 * 60 * 60) -> int: + reconciled = reconcile_running_runs(stale_worker_seconds=stale_running_seconds) run = claim_next_queued_run() if run is None: return reconciled diff --git a/src/pobsync_backend/templates/pobsync_backend/run_detail.html b/src/pobsync_backend/templates/pobsync_backend/run_detail.html index 6d82121..7526e33 100644 --- a/src/pobsync_backend/templates/pobsync_backend/run_detail.html +++ b/src/pobsync_backend/templates/pobsync_backend/run_detail.html @@ -79,6 +79,10 @@