from __future__ import annotations import os import socket from datetime import timedelta, timezone as datetime_timezone from pathlib import Path from django.db import transaction from django.utils import timezone from pobsync.commands.run_scheduled import ( DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, classify_rsync_warning, dry_run_log_path, run_scheduled, ) from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.retention import run_sql_retention_apply from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record def queue_backup_run( *, host: HostConfig, run_type: str = BackupRun.RunType.MANUAL, dry_run: bool = False, verbose_output: bool = False, prune: bool = False, prune_max_delete: int = 10, prune_protect_bases: bool = False, ) -> BackupRun: return BackupRun.objects.create( host=host, run_type=run_type, status=BackupRun.Status.QUEUED, result={ "requested": { "dry_run": bool(dry_run), "verbose_output": bool(dry_run or verbose_output), "prune": bool(prune), "prune_max_delete": int(prune_max_delete), "prune_protect_bases": bool(prune_protect_bases), } }, ) def execute_backup_run( *, run: BackupRun, prefix: Path, dry_run: bool = False, verbose_output: bool = False, prune: bool = False, prune_max_delete: int = 10, prune_protect_bases: bool = False, ) -> BackupRun: run.status = BackupRun.Status.RUNNING run.started_at = run.started_at or timezone.now() run.result = _running_result(run=run, dry_run=bool(dry_run)) run.save(update_fields=["status", "started_at", "result"]) try: result = run_scheduled( prefix=prefix, host=run.host.host, dry_run=bool(dry_run), prune=False, config_source=DjangoConfigSource(), run_id=run.id, cancel_check=lambda: _run_cancel_requested(run.id), verbose_output=bool(dry_run or verbose_output), state_callback=lambda state: _record_running_state(run.id, state), ) except Exception as exc: run.refresh_from_db() run.status = BackupRun.Status.CANCELLED if run.status == BackupRun.Status.CANCELLED else BackupRun.Status.FAILED run.ended_at = timezone.now() run.result = { **(run.result if isinstance(run.result, dict) else {}), "ok": False, "error": str(exc), "type": type(exc).__name__, } run.save(update_fields=["status", "ended_at", "result"]) raise run.refresh_from_db() if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED: run.status = BackupRun.Status.CANCELLED elif result.get("status") == BackupRun.Status.WARNING: run.status = BackupRun.Status.WARNING else: run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED run.ended_at = timezone.now() run.snapshot_path = str(result.get("snapshot") or "") run.base_path = str(result.get("base") or "") rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} run.rsync_exit_code = rsync.get("exit_code") run.result = result snapshot_record = None if run.snapshot_path: snapshot_path = Path(run.snapshot_path) try: kind = infer_snapshot_kind(snapshot_path) snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path) except ValueError: snapshot_record = None if result.get("ok") and not result.get("dry_run") and prune: try: result["prune"] = run_sql_retention_apply( prefix=prefix, host=run.host.host, kind="scheduled", protect_bases=bool(prune_protect_bases), yes=True, max_delete=int(prune_max_delete), action=run.run_type, acquire_lock=False, ) except Exception as exc: result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__} run.status = BackupRun.Status.WARNING run.result = result run.snapshot = snapshot_record run.save( update_fields=[ "status", "ended_at", "snapshot_path", "snapshot", "base_path", "rsync_exit_code", "result", ], ) run.snapshot = snapshot_record run.result = result run.save( update_fields=[ "status", "ended_at", "snapshot_path", "snapshot", "base_path", "rsync_exit_code", "result", ], ) return run def claim_next_queued_run() -> BackupRun | None: with transaction.atomic(): run = ( BackupRun.objects.select_related("host") .filter(status=BackupRun.Status.QUEUED, host__enabled=True) .order_by("created_at", "id") .first() ) if run is None: return None run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() run.save(update_fields=["status", "started_at"]) return run def reconcile_running_runs(*, grace_seconds: int = 300, stale_worker_seconds: int = 24 * 60 * 60) -> int: reconciled = 0 for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"): if _reconcile_running_run(run=run, grace_seconds=grace_seconds, stale_worker_seconds=stale_worker_seconds): reconciled += 1 return reconciled def requested_options(run: BackupRun) -> dict[str, object]: requested = run.result.get("requested") if isinstance(run.result, dict) else None if not isinstance(requested, dict): return {} return requested def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: result = dict(run.result) if isinstance(run.result, dict) else {} execution = { **_worker_execution_details(), "started_at": (run.started_at or timezone.now()).isoformat(), "heartbeat_at": timezone.now().isoformat(), } if dry_run: execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id)) result["execution"] = execution return result def _run_cancel_requested(run_id: int) -> bool: try: run = BackupRun.objects.only("id", "status", "result").get(id=run_id) except BackupRun.DoesNotExist: return True if run.status == BackupRun.Status.CANCELLED: return True if run.status == BackupRun.Status.RUNNING: _refresh_run_heartbeat(run) return False def _record_running_state(run_id: int, state: dict[str, object]) -> None: try: run = BackupRun.objects.only("id", "status", "result", "snapshot_path", "rsync_exit_code").get(id=run_id) except BackupRun.DoesNotExist: return if run.status != BackupRun.Status.RUNNING: return result = run.result if isinstance(run.result, dict) else {} execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} incoming_rsync = state.get("rsync") if isinstance(state.get("rsync"), dict) else {} log_path = state.get("log") snapshot_path = state.get("snapshot") phase = state.get("phase") if isinstance(phase, str) and phase: execution["phase"] = phase if isinstance(log_path, str) and log_path: execution["log"] = log_path if isinstance(snapshot_path, str) and snapshot_path: execution["snapshot"] = snapshot_path run.snapshot_path = snapshot_path if incoming_rsync: result["rsync"] = {**rsync, **incoming_rsync} exit_code = incoming_rsync.get("exit_code") if isinstance(exit_code, int): run.rsync_exit_code = exit_code result["execution"] = { **execution, "worker_pid": os.getpid(), "worker_host": socket.gethostname(), "heartbeat_at": timezone.now().isoformat(), } run.result = result run.save(update_fields=["snapshot_path", "rsync_exit_code", "result"]) def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool: result = run.result if isinstance(run.result, dict) else {} requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} log_path = _execution_log_path(result) log_tail = _read_log_tail(log_path) if log_path is not None else [] terminal_log = _terminal_rsync_log(log_tail) exit_code = _exit_code_from_log(log_tail) stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds) if not requested.get("dry_run"): if terminal_log: failure = classify_rsync_failure(exit_code or 255, log_tail) result.update( { "ok": False, "host": run.host.host, "log": str(log_path) if log_path else "", "failure": failure, "rsync": { **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), "exit_code": exit_code or 255, "log_tail": log_tail, }, } ) run.status = BackupRun.Status.FAILED run.ended_at = timezone.now() run.rsync_exit_code = exit_code or 255 run.result = result run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) return True if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds): result.update( { "ok": False, "host": run.host.host, "log": str(log_path) if log_path else "", "failure": { "category": "rsync_process", "message": "The rsync process is no longer running while the backup is still marked running.", "hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.", }, "rsync": { **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), "exit_code": exit_code or 255, "log_tail": log_tail, }, } ) run.status = BackupRun.Status.FAILED run.ended_at = timezone.now() run.rsync_exit_code = exit_code or 255 run.result = result run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) return True if stale_worker: result.update( { "ok": False, "host": run.host.host, "failure": { "category": "worker", "message": "The worker heartbeat stopped before the run finished.", "hint": "Check pobsync-worker.service logs before retrying the backup.", }, } ) run.status = BackupRun.Status.FAILED run.ended_at = timezone.now() run.result = result run.save(update_fields=["status", "ended_at", "result"]) return True return False timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) if not terminal_log and not timed_out and not stale_worker: return False exit_code = exit_code or (124 if timed_out or stale_worker else 255) failure = classify_rsync_failure(exit_code, log_tail) if stale_worker and not terminal_log: failure = { "category": "worker", "message": "The worker heartbeat stopped before the dry-run finished.", "hint": "Check pobsync-worker.service logs before retrying the dry-run.", } result.update( { "ok": False, "dry_run": True, "host": run.host.host, "base": result.get("base"), "log": str(log_path) if log_path else "", "failure": failure, "rsync": { **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), "exit_code": exit_code, "log_tail": log_tail, }, } ) run.status = BackupRun.Status.FAILED run.ended_at = timezone.now() run.rsync_exit_code = exit_code run.result = result run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) return True def _worker_execution_details() -> dict[str, object]: return { "worker_pid": os.getpid(), "worker_host": socket.gethostname(), "claimed_at": timezone.now().isoformat(), } def _refresh_run_heartbeat(run: BackupRun, *, interval_seconds: int = 30) -> None: result = run.result if isinstance(run.result, dict) else {} execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) if heartbeat_at is not None and timezone.now() < heartbeat_at + timedelta(seconds=interval_seconds): return result["execution"] = { **execution, "worker_pid": os.getpid(), "worker_host": socket.gethostname(), "heartbeat_at": timezone.now().isoformat(), } run.result = result run.save(update_fields=["result"]) def _execution_log_path(result: dict[str, object]) -> Path | None: execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} log = execution.get("log") or result.get("log") if not isinstance(log, str) or not log: return None return Path(log) def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]: if log_path is None: return [] try: return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:] except OSError: return [] def _terminal_rsync_log(log_tail: list[str]) -> bool: warning = classify_rsync_warning(_exit_code_from_log(log_tail), log_tail) if warning is not None: return False return any(line.startswith("rsync error:") for line in log_tail) def _exit_code_from_log(log_tail: list[str]) -> int | None: for line in reversed(log_tail): if "code 255" in line: return 255 if "code 24" in line: return 24 if "code 124" in line: return 124 if "code 12" in line: return 12 return None def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool: if run.started_at is None: return False result = run.result if isinstance(run.result, dict) else {} timeout_seconds = result.get("timeout_seconds") if not isinstance(timeout_seconds, int) or timeout_seconds <= 0: timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds) def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> bool: if stale_worker_seconds <= 0: return False result = run.result if isinstance(run.result, dict) else {} execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) if heartbeat_at is None: heartbeat_at = run.started_at if heartbeat_at is None: return False return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds) def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool: if grace_seconds <= 0: return False result = run.result if isinstance(run.result, dict) else {} execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} if execution.get("phase") != "rsync": return False rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} pid = rsync.get("pid") if not isinstance(pid, int) or pid <= 0: return False heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds): return False return not _process_exists(pid) def _process_exists(pid: int) -> bool: try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: return True return True def _parse_iso_datetime(value: object): if not isinstance(value, str) or not value: return None try: parsed = timezone.datetime.fromisoformat(value) except ValueError: return None if timezone.is_naive(parsed): return timezone.make_aware(parsed, timezone=datetime_timezone.utc) return parsed