Files
pobsync/src/pobsync_backend/snapshot_discovery.py

156 lines
4.8 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from pobsync.snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root
from .models import GlobalConfig, HostConfig, SnapshotRecord
def parse_snapshot_datetime(dirname: str, meta: dict[str, Any], key: str) -> datetime | None:
value = meta.get(key)
if isinstance(value, str):
parsed = _parse_iso_z(value)
if parsed is not None:
return parsed
if key == "started_at":
try:
prefix = dirname.split("__", 1)[0]
return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
except ValueError:
return None
return None
def discover_snapshots(
*,
host: HostConfig | None = None,
global_config: GlobalConfig | None = None,
kinds: list[str] | None = None,
) -> dict[str, Any]:
global_config = global_config or GlobalConfig.objects.get(name="default")
host_qs = HostConfig.objects.filter(enabled=True).order_by("host")
if host is not None:
host_qs = host_qs.filter(pk=host.pk)
kinds = kinds or ["scheduled", "manual", "incomplete"]
scanned = 0
created = 0
updated = 0
for host_config in host_qs:
host_root = resolve_host_root(global_config.backup_root, host_config.host)
for kind in kinds:
for snapshot_dir in iter_snapshot_dirs(host_root, kind):
_record, was_created = upsert_snapshot_record(
host=host_config,
kind=kind,
snapshot_dir=snapshot_dir,
)
scanned += 1
if was_created:
created += 1
else:
updated += 1
resolve_base_links(host=host_config)
return {
"ok": True,
"scanned": scanned,
"created": created,
"updated": updated,
}
def upsert_snapshot_record(*, host: HostConfig, kind: str, snapshot_dir: Path) -> tuple[SnapshotRecord, bool]:
meta = read_snapshot_meta(snapshot_dir)
base_defaults = _base_defaults_from_meta(meta)
defaults = {
"path": str(snapshot_dir),
**base_defaults,
"base": _resolve_base_record(
host=host,
kind=base_defaults["base_kind"],
dirname=base_defaults["base_dirname"],
),
"status": str(meta.get("status") or ""),
"started_at": parse_snapshot_datetime(snapshot_dir.name, meta, "started_at"),
"ended_at": parse_snapshot_datetime(snapshot_dir.name, meta, "ended_at"),
"metadata": meta,
}
return SnapshotRecord.objects.update_or_create(
host=host,
kind=kind,
dirname=snapshot_dir.name,
defaults=defaults,
)
def resolve_base_links(*, host: HostConfig | None = None) -> int:
snapshot_qs = SnapshotRecord.objects.exclude(base_dirname="").filter(base__isnull=True)
if host is not None:
snapshot_qs = snapshot_qs.filter(host=host)
updated = 0
for snapshot in snapshot_qs.select_related("host"):
base = _resolve_base_record(
host=snapshot.host,
kind=snapshot.base_kind,
dirname=snapshot.base_dirname,
)
if base is None:
continue
snapshot.base = base
snapshot.save(update_fields=["base"])
updated += 1
return updated
def infer_snapshot_kind(snapshot_path: Path) -> str:
parent = snapshot_path.parent.name
if parent == "scheduled":
return "scheduled"
if parent == "manual":
return "manual"
if parent == ".incomplete":
return "incomplete"
raise ValueError(f"Cannot infer snapshot kind from path: {snapshot_path}")
def _base_defaults_from_meta(meta: dict[str, Any]) -> dict[str, Any]:
base = meta.get("base")
if not isinstance(base, dict):
base = {}
return {
"base_kind": _base_value(base.get("kind")),
"base_dirname": _base_value(base.get("dirname")),
"base_path": _base_value(base.get("path")),
"base_snapshot_id": _base_value(base.get("id")),
}
def _base_value(value: Any) -> str:
return value if isinstance(value, str) else ""
def _resolve_base_record(*, host: HostConfig, kind: str, dirname: str) -> SnapshotRecord | None:
if not kind or not dirname:
return None
return SnapshotRecord.objects.filter(host=host, kind=kind, dirname=dirname).first()
def _parse_iso_z(value: str) -> datetime | None:
try:
if value.endswith("Z"):
return datetime.fromisoformat(value.removesuffix("Z") + "+00:00")
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed
except ValueError:
return None