from __future__ import annotations from datetime import datetime, timezone from pathlib import Path from typing import Any from pobsync.snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root from .models import GlobalConfig, HostConfig, SnapshotRecord def parse_snapshot_datetime(dirname: str, meta: dict[str, Any], key: str) -> datetime | None: value = meta.get(key) if isinstance(value, str): parsed = _parse_iso_z(value) if parsed is not None: return parsed if key == "started_at": try: prefix = dirname.split("__", 1)[0] return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc) except ValueError: return None return None def discover_snapshots( *, host: HostConfig | None = None, global_config: GlobalConfig | None = None, kinds: list[str] | None = None, ) -> dict[str, Any]: global_config = global_config or GlobalConfig.objects.get(name="default") host_qs = HostConfig.objects.filter(enabled=True).order_by("host") if host is not None: host_qs = host_qs.filter(pk=host.pk) kinds = kinds or ["scheduled", "manual", "incomplete"] scanned = 0 created = 0 updated = 0 for host_config in host_qs: host_root = resolve_host_root(global_config.backup_root, host_config.host) for kind in kinds: for snapshot_dir in iter_snapshot_dirs(host_root, kind): _record, was_created = upsert_snapshot_record( host=host_config, kind=kind, snapshot_dir=snapshot_dir, ) scanned += 1 if was_created: created += 1 else: updated += 1 return { "ok": True, "scanned": scanned, "created": created, "updated": updated, } def upsert_snapshot_record(*, host: HostConfig, kind: str, snapshot_dir: Path) -> tuple[SnapshotRecord, bool]: meta = read_snapshot_meta(snapshot_dir) defaults = { "path": str(snapshot_dir), "status": str(meta.get("status") or ""), "started_at": parse_snapshot_datetime(snapshot_dir.name, meta, "started_at"), "ended_at": parse_snapshot_datetime(snapshot_dir.name, meta, "ended_at"), "metadata": meta, } return SnapshotRecord.objects.update_or_create( host=host, kind=kind, dirname=snapshot_dir.name, defaults=defaults, ) def infer_snapshot_kind(snapshot_path: Path) -> str: parent = snapshot_path.parent.name if parent == "scheduled": return "scheduled" if parent == "manual": return "manual" if parent == ".incomplete": return "incomplete" raise ValueError(f"Cannot infer snapshot kind from path: {snapshot_path}") def _parse_iso_z(value: str) -> datetime | None: try: if value.endswith("Z"): return datetime.fromisoformat(value.removesuffix("Z") + "+00:00") parsed = datetime.fromisoformat(value) if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed except ValueError: return None