from __future__ import annotations from datetime import datetime, timezone from pathlib import Path from typing import Any from pobsync.snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root from .models import GlobalConfig, HostConfig, SnapshotRecord def parse_snapshot_datetime(dirname: str, meta: dict[str, Any], key: str) -> datetime | None: value = meta.get(key) if isinstance(value, str): parsed = _parse_iso_z(value) if parsed is not None: return parsed if key == "started_at": try: prefix = dirname.split("__", 1)[0] return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc) except ValueError: return None return None def discover_snapshots( *, host: HostConfig | None = None, global_config: GlobalConfig | None = None, kinds: list[str] | None = None, ) -> dict[str, Any]: global_config = global_config or GlobalConfig.objects.get(name="default") host_qs = HostConfig.objects.filter(enabled=True).order_by("host") if host is not None: host_qs = host_qs.filter(pk=host.pk) kinds = kinds or ["scheduled", "manual", "incomplete"] scanned = 0 created = 0 updated = 0 for host_config in host_qs: host_root = resolve_host_root(global_config.backup_root, host_config.host) for kind in kinds: for snapshot_dir in iter_snapshot_dirs(host_root, kind): _record, was_created = upsert_snapshot_record( host=host_config, kind=kind, snapshot_dir=snapshot_dir, ) scanned += 1 if was_created: created += 1 else: updated += 1 resolve_base_links(host=host_config) return { "ok": True, "scanned": scanned, "created": created, "updated": updated, } def upsert_snapshot_record(*, host: HostConfig, kind: str, snapshot_dir: Path) -> tuple[SnapshotRecord, bool]: meta = read_snapshot_meta(snapshot_dir) base_defaults = _base_defaults_from_meta(meta) defaults = { "path": str(snapshot_dir), **base_defaults, "base": _resolve_base_record( host=host, kind=base_defaults["base_kind"], dirname=base_defaults["base_dirname"], ), "status": str(meta.get("status") or ""), "started_at": parse_snapshot_datetime(snapshot_dir.name, meta, "started_at"), "ended_at": parse_snapshot_datetime(snapshot_dir.name, meta, "ended_at"), "metadata": meta, } return SnapshotRecord.objects.update_or_create( host=host, kind=kind, dirname=snapshot_dir.name, defaults=defaults, ) def resolve_base_links(*, host: HostConfig | None = None) -> int: snapshot_qs = SnapshotRecord.objects.exclude(base_dirname="").filter(base__isnull=True) if host is not None: snapshot_qs = snapshot_qs.filter(host=host) updated = 0 for snapshot in snapshot_qs.select_related("host"): base = _resolve_base_record( host=snapshot.host, kind=snapshot.base_kind, dirname=snapshot.base_dirname, ) if base is None: continue snapshot.base = base snapshot.save(update_fields=["base"]) updated += 1 return updated def infer_snapshot_kind(snapshot_path: Path) -> str: parent = snapshot_path.parent.name if parent == "scheduled": return "scheduled" if parent == "manual": return "manual" if parent == ".incomplete": return "incomplete" raise ValueError(f"Cannot infer snapshot kind from path: {snapshot_path}") def _base_defaults_from_meta(meta: dict[str, Any]) -> dict[str, Any]: base = meta.get("base") if not isinstance(base, dict): base = {} return { "base_kind": _base_value(base.get("kind")), "base_dirname": _base_value(base.get("dirname")), "base_path": _base_value(base.get("path")), "base_snapshot_id": _base_value(base.get("id")), } def _base_value(value: Any) -> str: return value if isinstance(value, str) else "" def _resolve_base_record(*, host: HostConfig, kind: str, dirname: str) -> SnapshotRecord | None: if not kind or not dirname: return None return SnapshotRecord.objects.filter(host=host, kind=kind, dirname=dirname).first() def _parse_iso_z(value: str) -> datetime | None: try: if value.endswith("Z"): return datetime.fromisoformat(value.removesuffix("Z") + "+00:00") parsed = datetime.fromisoformat(value) if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed except ValueError: return None