Files
pobsync/src/pobsync_backend/snapshot_discovery.py
Peter van Arkel 1d90454109 (feature) improve snapshot discovery visibility in Django
Add a discovery preflight that reports the configured backup root, host
root, and snapshot directory counts before importing anything.

Show discovery status on host detail pages so missing mounts or mismatched
host directories are visible from the UI.

Warn clearly when discovery scans zero snapshots, including whether the
host backup directory is missing or simply empty.
2026-05-19 13:21:31 +02:00

204 lines
6.4 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from pobsync.snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root
from .models import GlobalConfig, HostConfig, SnapshotRecord
def parse_snapshot_datetime(dirname: str, meta: dict[str, Any], key: str) -> datetime | None:
value = meta.get(key)
if isinstance(value, str):
parsed = _parse_iso_z(value)
if parsed is not None:
return parsed
if key == "started_at":
try:
prefix = dirname.split("__", 1)[0]
return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
except ValueError:
return None
return None
def discover_snapshots(
*,
host: HostConfig | None = None,
global_config: GlobalConfig | None = None,
kinds: list[str] | None = None,
) -> dict[str, Any]:
global_config = global_config or GlobalConfig.objects.get(name="default")
host_qs = HostConfig.objects.filter(enabled=True).order_by("host")
if host is not None:
host_qs = host_qs.filter(pk=host.pk)
kinds = kinds or ["scheduled", "manual", "incomplete"]
scanned = 0
created = 0
updated = 0
for host_config in host_qs:
host_root = resolve_host_root(global_config.backup_root, host_config.host)
for kind in kinds:
for snapshot_dir in iter_snapshot_dirs(host_root, kind):
_record, was_created = upsert_snapshot_record(
host=host_config,
kind=kind,
snapshot_dir=snapshot_dir,
)
scanned += 1
if was_created:
created += 1
else:
updated += 1
resolve_base_links(host=host_config)
return {
"ok": True,
"scanned": scanned,
"created": created,
"updated": updated,
}
def inspect_snapshot_discovery(
*,
host: HostConfig,
global_config: GlobalConfig | None = None,
kinds: list[str] | None = None,
) -> dict[str, Any]:
try:
global_config = global_config or GlobalConfig.objects.get(name="default")
except GlobalConfig.DoesNotExist:
return {
"ok": False,
"reason": "missing_global_config",
"message": "Create the default global config before discovering snapshots.",
"backup_root": "",
"host_root": "",
"host_root_exists": False,
"kind_counts": {},
"total_candidates": 0,
}
kinds = kinds or ["scheduled", "manual", "incomplete"]
host_root = resolve_host_root(global_config.backup_root, host.host)
kind_counts = {kind: len(list(iter_snapshot_dirs(host_root, kind))) for kind in kinds}
total_candidates = sum(kind_counts.values())
host_root_exists = host_root.exists()
if not host_root_exists:
reason = "missing_host_root"
message = f"Host backup directory does not exist yet: {host_root}"
elif total_candidates == 0:
reason = "no_snapshots"
message = f"No snapshot directories found below {host_root}."
else:
reason = "ready"
message = f"Found {total_candidates} snapshot directories below {host_root}."
return {
"ok": True,
"reason": reason,
"message": message,
"backup_root": str(global_config.backup_root),
"host_root": str(host_root),
"host_root_exists": host_root_exists,
"kind_counts": kind_counts,
"total_candidates": total_candidates,
}
def upsert_snapshot_record(*, host: HostConfig, kind: str, snapshot_dir: Path) -> tuple[SnapshotRecord, bool]:
meta = read_snapshot_meta(snapshot_dir)
base_defaults = _base_defaults_from_meta(meta)
defaults = {
"path": str(snapshot_dir),
**base_defaults,
"base": _resolve_base_record(
host=host,
kind=base_defaults["base_kind"],
dirname=base_defaults["base_dirname"],
),
"status": str(meta.get("status") or ""),
"started_at": parse_snapshot_datetime(snapshot_dir.name, meta, "started_at"),
"ended_at": parse_snapshot_datetime(snapshot_dir.name, meta, "ended_at"),
"metadata": meta,
}
return SnapshotRecord.objects.update_or_create(
host=host,
kind=kind,
dirname=snapshot_dir.name,
defaults=defaults,
)
def resolve_base_links(*, host: HostConfig | None = None) -> int:
snapshot_qs = SnapshotRecord.objects.exclude(base_dirname="").filter(base__isnull=True)
if host is not None:
snapshot_qs = snapshot_qs.filter(host=host)
updated = 0
for snapshot in snapshot_qs.select_related("host"):
base = _resolve_base_record(
host=snapshot.host,
kind=snapshot.base_kind,
dirname=snapshot.base_dirname,
)
if base is None:
continue
snapshot.base = base
snapshot.save(update_fields=["base"])
updated += 1
return updated
def infer_snapshot_kind(snapshot_path: Path) -> str:
parent = snapshot_path.parent.name
if parent == "scheduled":
return "scheduled"
if parent == "manual":
return "manual"
if parent == ".incomplete":
return "incomplete"
raise ValueError(f"Cannot infer snapshot kind from path: {snapshot_path}")
def _base_defaults_from_meta(meta: dict[str, Any]) -> dict[str, Any]:
base = meta.get("base")
if not isinstance(base, dict):
base = {}
return {
"base_kind": _base_value(base.get("kind")),
"base_dirname": _base_value(base.get("dirname")),
"base_path": _base_value(base.get("path")),
"base_snapshot_id": _base_value(base.get("id")),
}
def _base_value(value: Any) -> str:
return value if isinstance(value, str) else ""
def _resolve_base_record(*, host: HostConfig, kind: str, dirname: str) -> SnapshotRecord | None:
if not kind or not dirname:
return None
return SnapshotRecord.objects.filter(host=host, kind=kind, dirname=dirname).first()
def _parse_iso_z(value: str) -> datetime | None:
try:
if value.endswith("Z"):
return datetime.fromisoformat(value.removesuffix("Z") + "+00:00")
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed
except ValueError:
return None