feat: discover snapshots into Django records
Add a Django-native snapshot discovery service and management command that scans backup directories, reads snapshot metadata, and idempotently upserts SnapshotRecord rows. Expose it through the pobsync command wrapper, update admin/docs, and cover discovery behavior with tests.
This commit is contained in:
85
src/pobsync_backend/snapshot_discovery.py
Normal file
85
src/pobsync_backend/snapshot_discovery.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pobsync.snapshot_meta import iter_snapshot_dirs, read_snapshot_meta, resolve_host_root
|
||||
|
||||
from .models import GlobalConfig, HostConfig, SnapshotRecord
|
||||
|
||||
|
||||
def parse_snapshot_datetime(dirname: str, meta: dict[str, Any], key: str) -> datetime | None:
|
||||
value = meta.get(key)
|
||||
if isinstance(value, str):
|
||||
parsed = _parse_iso_z(value)
|
||||
if parsed is not None:
|
||||
return parsed
|
||||
|
||||
if key == "started_at":
|
||||
try:
|
||||
prefix = dirname.split("__", 1)[0]
|
||||
return datetime.strptime(prefix, "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def discover_snapshots(
|
||||
*,
|
||||
host: HostConfig | None = None,
|
||||
global_config: GlobalConfig | None = None,
|
||||
kinds: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
global_config = global_config or GlobalConfig.objects.get(name="default")
|
||||
host_qs = HostConfig.objects.filter(enabled=True).order_by("host")
|
||||
if host is not None:
|
||||
host_qs = host_qs.filter(pk=host.pk)
|
||||
|
||||
kinds = kinds or ["scheduled", "manual", "incomplete"]
|
||||
scanned = 0
|
||||
created = 0
|
||||
updated = 0
|
||||
|
||||
for host_config in host_qs:
|
||||
host_root = resolve_host_root(global_config.backup_root, host_config.host)
|
||||
for kind in kinds:
|
||||
for snapshot_dir in iter_snapshot_dirs(host_root, kind):
|
||||
meta = read_snapshot_meta(snapshot_dir)
|
||||
defaults = {
|
||||
"path": str(snapshot_dir),
|
||||
"status": str(meta.get("status") or ""),
|
||||
"started_at": parse_snapshot_datetime(snapshot_dir.name, meta, "started_at"),
|
||||
"ended_at": parse_snapshot_datetime(snapshot_dir.name, meta, "ended_at"),
|
||||
"metadata": meta,
|
||||
}
|
||||
_record, was_created = SnapshotRecord.objects.update_or_create(
|
||||
host=host_config,
|
||||
kind=kind,
|
||||
dirname=snapshot_dir.name,
|
||||
defaults=defaults,
|
||||
)
|
||||
scanned += 1
|
||||
if was_created:
|
||||
created += 1
|
||||
else:
|
||||
updated += 1
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"scanned": scanned,
|
||||
"created": created,
|
||||
"updated": updated,
|
||||
}
|
||||
|
||||
|
||||
def _parse_iso_z(value: str) -> datetime | None:
|
||||
try:
|
||||
if value.endswith("Z"):
|
||||
return datetime.fromisoformat(value.removesuffix("Z") + "+00:00")
|
||||
parsed = datetime.fromisoformat(value)
|
||||
if parsed.tzinfo is None:
|
||||
return parsed.replace(tzinfo=timezone.utc)
|
||||
return parsed
|
||||
except ValueError:
|
||||
return None
|
||||
Reference in New Issue
Block a user