2026-05-19 13:00:12 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-05-21 03:16:38 +02:00
|
|
|
import os
|
|
|
|
|
import socket
|
|
|
|
|
from datetime import timedelta, timezone as datetime_timezone
|
2026-05-19 13:00:12 +02:00
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
from django.db import transaction
|
|
|
|
|
from django.utils import timezone
|
|
|
|
|
|
2026-05-23 00:23:14 +02:00
|
|
|
from pobsync.commands.run_scheduled import (
|
|
|
|
|
DEFAULT_DRY_RUN_TIMEOUT_SECONDS,
|
|
|
|
|
classify_rsync_failure,
|
|
|
|
|
classify_rsync_warning,
|
|
|
|
|
dry_run_log_path,
|
|
|
|
|
run_scheduled,
|
|
|
|
|
)
|
2026-05-19 13:00:12 +02:00
|
|
|
from pobsync_backend.config_source import DjangoConfigSource
|
|
|
|
|
from pobsync_backend.models import BackupRun, HostConfig
|
|
|
|
|
from pobsync_backend.retention import run_sql_retention_apply
|
|
|
|
|
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def queue_backup_run(
|
|
|
|
|
*,
|
|
|
|
|
host: HostConfig,
|
|
|
|
|
run_type: str = BackupRun.RunType.MANUAL,
|
|
|
|
|
dry_run: bool = False,
|
2026-05-19 22:13:33 +02:00
|
|
|
verbose_output: bool = False,
|
2026-05-19 13:00:12 +02:00
|
|
|
prune: bool = False,
|
|
|
|
|
prune_max_delete: int = 10,
|
|
|
|
|
prune_protect_bases: bool = False,
|
|
|
|
|
) -> BackupRun:
|
|
|
|
|
return BackupRun.objects.create(
|
|
|
|
|
host=host,
|
|
|
|
|
run_type=run_type,
|
|
|
|
|
status=BackupRun.Status.QUEUED,
|
|
|
|
|
result={
|
|
|
|
|
"requested": {
|
|
|
|
|
"dry_run": bool(dry_run),
|
2026-05-19 22:13:33 +02:00
|
|
|
"verbose_output": bool(dry_run or verbose_output),
|
2026-05-19 13:00:12 +02:00
|
|
|
"prune": bool(prune),
|
|
|
|
|
"prune_max_delete": int(prune_max_delete),
|
|
|
|
|
"prune_protect_bases": bool(prune_protect_bases),
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def execute_backup_run(
|
|
|
|
|
*,
|
|
|
|
|
run: BackupRun,
|
|
|
|
|
prefix: Path,
|
|
|
|
|
dry_run: bool = False,
|
2026-05-19 22:13:33 +02:00
|
|
|
verbose_output: bool = False,
|
2026-05-19 13:00:12 +02:00
|
|
|
prune: bool = False,
|
|
|
|
|
prune_max_delete: int = 10,
|
|
|
|
|
prune_protect_bases: bool = False,
|
|
|
|
|
) -> BackupRun:
|
|
|
|
|
run.status = BackupRun.Status.RUNNING
|
|
|
|
|
run.started_at = run.started_at or timezone.now()
|
2026-05-19 20:46:10 +02:00
|
|
|
run.result = _running_result(run=run, dry_run=bool(dry_run))
|
|
|
|
|
run.save(update_fields=["status", "started_at", "result"])
|
2026-05-19 13:00:12 +02:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
result = run_scheduled(
|
|
|
|
|
prefix=prefix,
|
|
|
|
|
host=run.host.host,
|
|
|
|
|
dry_run=bool(dry_run),
|
|
|
|
|
prune=False,
|
|
|
|
|
config_source=DjangoConfigSource(),
|
2026-05-19 20:46:10 +02:00
|
|
|
run_id=run.id,
|
|
|
|
|
cancel_check=lambda: _run_cancel_requested(run.id),
|
2026-05-19 22:13:33 +02:00
|
|
|
verbose_output=bool(dry_run or verbose_output),
|
2026-05-23 00:23:14 +02:00
|
|
|
state_callback=lambda state: _record_running_state(run.id, state),
|
2026-05-19 13:00:12 +02:00
|
|
|
)
|
|
|
|
|
except Exception as exc:
|
2026-05-19 20:46:10 +02:00
|
|
|
run.refresh_from_db()
|
|
|
|
|
run.status = BackupRun.Status.CANCELLED if run.status == BackupRun.Status.CANCELLED else BackupRun.Status.FAILED
|
2026-05-19 13:00:12 +02:00
|
|
|
run.ended_at = timezone.now()
|
2026-05-19 20:46:10 +02:00
|
|
|
run.result = {
|
|
|
|
|
**(run.result if isinstance(run.result, dict) else {}),
|
|
|
|
|
"ok": False,
|
|
|
|
|
"error": str(exc),
|
|
|
|
|
"type": type(exc).__name__,
|
|
|
|
|
}
|
2026-05-19 13:00:12 +02:00
|
|
|
run.save(update_fields=["status", "ended_at", "result"])
|
|
|
|
|
raise
|
|
|
|
|
|
2026-05-19 20:46:10 +02:00
|
|
|
run.refresh_from_db()
|
|
|
|
|
if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED:
|
|
|
|
|
run.status = BackupRun.Status.CANCELLED
|
2026-05-23 00:23:14 +02:00
|
|
|
elif result.get("status") == BackupRun.Status.WARNING:
|
|
|
|
|
run.status = BackupRun.Status.WARNING
|
2026-05-19 20:46:10 +02:00
|
|
|
else:
|
|
|
|
|
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
|
2026-05-19 13:00:12 +02:00
|
|
|
run.ended_at = timezone.now()
|
|
|
|
|
run.snapshot_path = str(result.get("snapshot") or "")
|
|
|
|
|
run.base_path = str(result.get("base") or "")
|
|
|
|
|
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
|
|
|
|
|
run.rsync_exit_code = rsync.get("exit_code")
|
|
|
|
|
run.result = result
|
|
|
|
|
snapshot_record = None
|
|
|
|
|
if run.snapshot_path:
|
|
|
|
|
snapshot_path = Path(run.snapshot_path)
|
|
|
|
|
try:
|
|
|
|
|
kind = infer_snapshot_kind(snapshot_path)
|
|
|
|
|
snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path)
|
|
|
|
|
except ValueError:
|
|
|
|
|
snapshot_record = None
|
|
|
|
|
|
|
|
|
|
if result.get("ok") and not result.get("dry_run") and prune:
|
|
|
|
|
try:
|
|
|
|
|
result["prune"] = run_sql_retention_apply(
|
|
|
|
|
prefix=prefix,
|
|
|
|
|
host=run.host.host,
|
|
|
|
|
kind="scheduled",
|
|
|
|
|
protect_bases=bool(prune_protect_bases),
|
|
|
|
|
yes=True,
|
|
|
|
|
max_delete=int(prune_max_delete),
|
2026-05-21 03:46:38 +02:00
|
|
|
action=run.run_type,
|
2026-05-19 13:00:12 +02:00
|
|
|
acquire_lock=False,
|
|
|
|
|
)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
|
2026-05-19 23:20:52 +02:00
|
|
|
run.status = BackupRun.Status.WARNING
|
2026-05-19 13:00:12 +02:00
|
|
|
run.result = result
|
|
|
|
|
run.snapshot = snapshot_record
|
|
|
|
|
run.save(
|
|
|
|
|
update_fields=[
|
|
|
|
|
"status",
|
|
|
|
|
"ended_at",
|
|
|
|
|
"snapshot_path",
|
|
|
|
|
"snapshot",
|
|
|
|
|
"base_path",
|
|
|
|
|
"rsync_exit_code",
|
|
|
|
|
"result",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
run.snapshot = snapshot_record
|
|
|
|
|
run.result = result
|
|
|
|
|
run.save(
|
|
|
|
|
update_fields=[
|
|
|
|
|
"status",
|
|
|
|
|
"ended_at",
|
|
|
|
|
"snapshot_path",
|
|
|
|
|
"snapshot",
|
|
|
|
|
"base_path",
|
|
|
|
|
"rsync_exit_code",
|
|
|
|
|
"result",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
return run
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def claim_next_queued_run() -> BackupRun | None:
|
|
|
|
|
with transaction.atomic():
|
|
|
|
|
run = (
|
|
|
|
|
BackupRun.objects.select_related("host")
|
|
|
|
|
.filter(status=BackupRun.Status.QUEUED, host__enabled=True)
|
|
|
|
|
.order_by("created_at", "id")
|
|
|
|
|
.first()
|
|
|
|
|
)
|
|
|
|
|
if run is None:
|
|
|
|
|
return None
|
|
|
|
|
run.status = BackupRun.Status.RUNNING
|
|
|
|
|
run.started_at = timezone.now()
|
|
|
|
|
run.save(update_fields=["status", "started_at"])
|
|
|
|
|
return run
|
|
|
|
|
|
|
|
|
|
|
2026-05-21 03:16:38 +02:00
|
|
|
def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int:
|
2026-05-19 21:10:08 +02:00
|
|
|
reconciled = 0
|
|
|
|
|
for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"):
|
2026-05-21 03:16:38 +02:00
|
|
|
if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds):
|
2026-05-19 21:10:08 +02:00
|
|
|
reconciled += 1
|
|
|
|
|
return reconciled
|
|
|
|
|
|
|
|
|
|
|
2026-05-19 13:00:12 +02:00
|
|
|
def requested_options(run: BackupRun) -> dict[str, object]:
|
|
|
|
|
requested = run.result.get("requested") if isinstance(run.result, dict) else None
|
|
|
|
|
if not isinstance(requested, dict):
|
|
|
|
|
return {}
|
|
|
|
|
return requested
|
2026-05-19 20:46:10 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
|
|
|
|
|
result = dict(run.result) if isinstance(run.result, dict) else {}
|
|
|
|
|
execution = {
|
2026-05-21 03:16:38 +02:00
|
|
|
**_worker_execution_details(),
|
2026-05-19 20:46:10 +02:00
|
|
|
"started_at": (run.started_at or timezone.now()).isoformat(),
|
2026-05-21 03:16:38 +02:00
|
|
|
"heartbeat_at": timezone.now().isoformat(),
|
2026-05-19 20:46:10 +02:00
|
|
|
}
|
|
|
|
|
if dry_run:
|
|
|
|
|
execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id))
|
|
|
|
|
result["execution"] = execution
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run_cancel_requested(run_id: int) -> bool:
|
2026-05-21 03:16:38 +02:00
|
|
|
try:
|
|
|
|
|
run = BackupRun.objects.only("id", "status", "result").get(id=run_id)
|
|
|
|
|
except BackupRun.DoesNotExist:
|
|
|
|
|
return True
|
|
|
|
|
if run.status == BackupRun.Status.CANCELLED:
|
|
|
|
|
return True
|
|
|
|
|
if run.status == BackupRun.Status.RUNNING:
|
|
|
|
|
_refresh_run_heartbeat(run)
|
|
|
|
|
return False
|
2026-05-19 21:10:08 +02:00
|
|
|
|
|
|
|
|
|
2026-05-23 00:23:14 +02:00
|
|
|
def _record_running_state(run_id: int, state: dict[str, object]) -> None:
|
|
|
|
|
try:
|
|
|
|
|
run = BackupRun.objects.only("id", "status", "result", "snapshot_path", "rsync_exit_code").get(id=run_id)
|
|
|
|
|
except BackupRun.DoesNotExist:
|
|
|
|
|
return
|
|
|
|
|
if run.status != BackupRun.Status.RUNNING:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
|
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
|
|
|
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
|
|
|
|
|
incoming_rsync = state.get("rsync") if isinstance(state.get("rsync"), dict) else {}
|
|
|
|
|
|
|
|
|
|
log_path = state.get("log")
|
|
|
|
|
snapshot_path = state.get("snapshot")
|
|
|
|
|
if isinstance(log_path, str) and log_path:
|
|
|
|
|
execution["log"] = log_path
|
|
|
|
|
if isinstance(snapshot_path, str) and snapshot_path:
|
|
|
|
|
execution["snapshot"] = snapshot_path
|
|
|
|
|
run.snapshot_path = snapshot_path
|
|
|
|
|
if incoming_rsync:
|
|
|
|
|
result["rsync"] = {**rsync, **incoming_rsync}
|
|
|
|
|
exit_code = incoming_rsync.get("exit_code")
|
|
|
|
|
if isinstance(exit_code, int):
|
|
|
|
|
run.rsync_exit_code = exit_code
|
|
|
|
|
result["execution"] = {
|
|
|
|
|
**execution,
|
|
|
|
|
"worker_pid": os.getpid(),
|
|
|
|
|
"worker_host": socket.gethostname(),
|
|
|
|
|
"heartbeat_at": timezone.now().isoformat(),
|
|
|
|
|
}
|
|
|
|
|
run.result = result
|
|
|
|
|
run.save(update_fields=["snapshot_path", "rsync_exit_code", "result"])
|
|
|
|
|
|
|
|
|
|
|
2026-05-21 03:16:38 +02:00
|
|
|
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool:
|
2026-05-19 21:10:08 +02:00
|
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
|
|
|
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
|
2026-05-23 00:23:14 +02:00
|
|
|
log_path = _execution_log_path(result)
|
|
|
|
|
log_tail = _read_log_tail(log_path) if log_path is not None else []
|
|
|
|
|
terminal_log = _terminal_rsync_log(log_tail)
|
|
|
|
|
exit_code = _exit_code_from_log(log_tail)
|
2026-05-21 03:16:38 +02:00
|
|
|
stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds)
|
2026-05-19 21:10:08 +02:00
|
|
|
if not requested.get("dry_run"):
|
2026-05-23 00:23:14 +02:00
|
|
|
if terminal_log:
|
|
|
|
|
failure = classify_rsync_failure(exit_code or 255, log_tail)
|
|
|
|
|
result.update(
|
|
|
|
|
{
|
|
|
|
|
"ok": False,
|
|
|
|
|
"host": run.host.host,
|
|
|
|
|
"log": str(log_path) if log_path else "",
|
|
|
|
|
"failure": failure,
|
|
|
|
|
"rsync": {
|
|
|
|
|
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
|
|
|
|
|
"exit_code": exit_code or 255,
|
|
|
|
|
"log_tail": log_tail,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
run.status = BackupRun.Status.FAILED
|
|
|
|
|
run.ended_at = timezone.now()
|
|
|
|
|
run.rsync_exit_code = exit_code or 255
|
|
|
|
|
run.result = result
|
|
|
|
|
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
|
|
|
|
|
return True
|
2026-05-21 03:16:38 +02:00
|
|
|
if stale_worker:
|
|
|
|
|
result.update(
|
|
|
|
|
{
|
|
|
|
|
"ok": False,
|
|
|
|
|
"host": run.host.host,
|
|
|
|
|
"failure": {
|
|
|
|
|
"category": "worker",
|
|
|
|
|
"message": "The worker heartbeat stopped before the run finished.",
|
|
|
|
|
"hint": "Check pobsync-worker.service logs before retrying the backup.",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
run.status = BackupRun.Status.FAILED
|
|
|
|
|
run.ended_at = timezone.now()
|
|
|
|
|
run.result = result
|
|
|
|
|
run.save(update_fields=["status", "ended_at", "result"])
|
|
|
|
|
return True
|
2026-05-19 21:10:08 +02:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
|
2026-05-21 03:16:38 +02:00
|
|
|
if not terminal_log and not timed_out and not stale_worker:
|
2026-05-19 21:10:08 +02:00
|
|
|
return False
|
|
|
|
|
|
2026-05-23 00:23:14 +02:00
|
|
|
exit_code = exit_code or (124 if timed_out or stale_worker else 255)
|
2026-05-19 21:10:08 +02:00
|
|
|
failure = classify_rsync_failure(exit_code, log_tail)
|
2026-05-21 03:16:38 +02:00
|
|
|
if stale_worker and not terminal_log:
|
|
|
|
|
failure = {
|
|
|
|
|
"category": "worker",
|
|
|
|
|
"message": "The worker heartbeat stopped before the dry-run finished.",
|
|
|
|
|
"hint": "Check pobsync-worker.service logs before retrying the dry-run.",
|
|
|
|
|
}
|
2026-05-19 21:10:08 +02:00
|
|
|
result.update(
|
|
|
|
|
{
|
|
|
|
|
"ok": False,
|
|
|
|
|
"dry_run": True,
|
|
|
|
|
"host": run.host.host,
|
|
|
|
|
"base": result.get("base"),
|
|
|
|
|
"log": str(log_path) if log_path else "",
|
|
|
|
|
"failure": failure,
|
|
|
|
|
"rsync": {
|
|
|
|
|
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
|
|
|
|
|
"exit_code": exit_code,
|
|
|
|
|
"log_tail": log_tail,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
run.status = BackupRun.Status.FAILED
|
|
|
|
|
run.ended_at = timezone.now()
|
|
|
|
|
run.rsync_exit_code = exit_code
|
|
|
|
|
run.result = result
|
|
|
|
|
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
2026-05-21 03:16:38 +02:00
|
|
|
def _worker_execution_details() -> dict[str, object]:
|
|
|
|
|
return {
|
|
|
|
|
"worker_pid": os.getpid(),
|
|
|
|
|
"worker_host": socket.gethostname(),
|
|
|
|
|
"claimed_at": timezone.now().isoformat(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None:
|
|
|
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
|
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
|
|
|
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
|
|
|
|
|
if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds):
|
|
|
|
|
return
|
|
|
|
|
result["execution"] = {
|
|
|
|
|
**execution,
|
|
|
|
|
"worker_pid": os.getpid(),
|
|
|
|
|
"worker_host": socket.gethostname(),
|
|
|
|
|
"heartbeat_at": timezone.now().isoformat(),
|
|
|
|
|
}
|
|
|
|
|
run.result = result
|
|
|
|
|
run.save(update_fields=["result"])
|
|
|
|
|
|
|
|
|
|
|
2026-05-19 21:10:08 +02:00
|
|
|
def _execution_log_path(result: dict[str, object]) -> Path | None:
|
|
|
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
|
|
|
log = execution.get("log") or result.get("log")
|
|
|
|
|
if not isinstance(log, str) or not log:
|
|
|
|
|
return None
|
|
|
|
|
return Path(log)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
|
|
|
|
|
if log_path is None:
|
|
|
|
|
return []
|
|
|
|
|
try:
|
|
|
|
|
return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:]
|
|
|
|
|
except OSError:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _terminal_rsync_log(log_tail: list[str]) -> bool:
|
2026-05-23 00:23:14 +02:00
|
|
|
warning = classify_rsync_warning(_exit_code_from_log(log_tail), log_tail)
|
|
|
|
|
if warning is not None:
|
|
|
|
|
return False
|
2026-05-19 21:10:08 +02:00
|
|
|
return any(line.startswith("rsync error:") for line in log_tail)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _exit_code_from_log(log_tail: list[str]) -> int | None:
|
|
|
|
|
for line in reversed(log_tail):
|
|
|
|
|
if "code 255" in line:
|
|
|
|
|
return 255
|
2026-05-23 00:23:14 +02:00
|
|
|
if "code 24" in line:
|
|
|
|
|
return 24
|
2026-05-19 21:10:08 +02:00
|
|
|
if "code 124" in line:
|
|
|
|
|
return 124
|
|
|
|
|
if "code 12" in line:
|
|
|
|
|
return 12
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool:
|
|
|
|
|
if run.started_at is None:
|
|
|
|
|
return False
|
|
|
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
|
|
|
timeout_seconds = result.get("timeout_seconds")
|
|
|
|
|
if not isinstance(timeout_seconds, int) or timeout_seconds <= 0:
|
|
|
|
|
timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS
|
|
|
|
|
return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds)
|
2026-05-21 03:16:38 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool:
|
|
|
|
|
if stale_worker_seconds <= 0:
|
|
|
|
|
return False
|
|
|
|
|
result = run.result if isinstance(run.result, dict) else {}
|
|
|
|
|
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
|
|
|
|
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at"))
|
|
|
|
|
if heartbeat_at is None:
|
|
|
|
|
heartbeat_at = run.started_at
|
|
|
|
|
if heartbeat_at is None:
|
|
|
|
|
return False
|
|
|
|
|
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_iso_datetime(value: object):
|
|
|
|
|
if not isinstance(value, str) or not value:
|
|
|
|
|
return None
|
|
|
|
|
try:
|
|
|
|
|
parsed = timezone.datetime.fromisoformat(value)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return None
|
|
|
|
|
if timezone.is_naive(parsed):
|
|
|
|
|
return timezone.make_aware(parsed, timezone=datetime_timezone.utc)
|
|
|
|
|
return parsed
|