from __future__ import annotations from pathlib import Path from django.db import transaction from django.utils import timezone from pobsync.commands.run_scheduled import dry_run_log_path, run_scheduled from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.retention import run_sql_retention_apply from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record def queue_backup_run( *, host: HostConfig, run_type: str = BackupRun.RunType.MANUAL, dry_run: bool = False, prune: bool = False, prune_max_delete: int = 10, prune_protect_bases: bool = False, ) -> BackupRun: return BackupRun.objects.create( host=host, run_type=run_type, status=BackupRun.Status.QUEUED, result={ "requested": { "dry_run": bool(dry_run), "prune": bool(prune), "prune_max_delete": int(prune_max_delete), "prune_protect_bases": bool(prune_protect_bases), } }, ) def execute_backup_run( *, run: BackupRun, prefix: Path, dry_run: bool = False, prune: bool = False, prune_max_delete: int = 10, prune_protect_bases: bool = False, ) -> BackupRun: run.status = BackupRun.Status.RUNNING run.started_at = run.started_at or timezone.now() run.result = _running_result(run=run, dry_run=bool(dry_run)) run.save(update_fields=["status", "started_at", "result"]) try: result = run_scheduled( prefix=prefix, host=run.host.host, dry_run=bool(dry_run), prune=False, config_source=DjangoConfigSource(), run_id=run.id, cancel_check=lambda: _run_cancel_requested(run.id), ) except Exception as exc: run.refresh_from_db() run.status = BackupRun.Status.CANCELLED if run.status == BackupRun.Status.CANCELLED else BackupRun.Status.FAILED run.ended_at = timezone.now() run.result = { **(run.result if isinstance(run.result, dict) else {}), "ok": False, "error": str(exc), "type": type(exc).__name__, } run.save(update_fields=["status", "ended_at", "result"]) raise run.refresh_from_db() if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED: run.status = BackupRun.Status.CANCELLED else: run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED run.ended_at = timezone.now() run.snapshot_path = str(result.get("snapshot") or "") run.base_path = str(result.get("base") or "") rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} run.rsync_exit_code = rsync.get("exit_code") run.result = result snapshot_record = None if run.snapshot_path: snapshot_path = Path(run.snapshot_path) try: kind = infer_snapshot_kind(snapshot_path) snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path) except ValueError: snapshot_record = None if result.get("ok") and not result.get("dry_run") and prune: try: result["prune"] = run_sql_retention_apply( prefix=prefix, host=run.host.host, kind="scheduled", protect_bases=bool(prune_protect_bases), yes=True, max_delete=int(prune_max_delete), acquire_lock=False, ) except Exception as exc: result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__} run.status = BackupRun.Status.FAILED run.result = result run.snapshot = snapshot_record run.save( update_fields=[ "status", "ended_at", "snapshot_path", "snapshot", "base_path", "rsync_exit_code", "result", ], ) raise run.snapshot = snapshot_record run.result = result run.save( update_fields=[ "status", "ended_at", "snapshot_path", "snapshot", "base_path", "rsync_exit_code", "result", ], ) return run def claim_next_queued_run() -> BackupRun | None: with transaction.atomic(): run = ( BackupRun.objects.select_related("host") .filter(status=BackupRun.Status.QUEUED, host__enabled=True) .order_by("created_at", "id") .first() ) if run is None: return None run.status = BackupRun.Status.RUNNING run.started_at = timezone.now() run.save(update_fields=["status", "started_at"]) return run def requested_options(run: BackupRun) -> dict[str, object]: requested = run.result.get("requested") if isinstance(run.result, dict) else None if not isinstance(requested, dict): return {} return requested def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: result = dict(run.result) if isinstance(run.result, dict) else {} execution = { "started_at": (run.started_at or timezone.now()).isoformat(), } if dry_run: execution["log"] = str(dry_run_log_path(run.host.host, run_id=run.id)) result["execution"] = execution return result def _run_cancel_requested(run_id: int) -> bool: return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists()