(feature) Add staff-only JSON inspection API
Expose lightweight Django JSON endpoints for hosts, snapshots, and backup runs using the existing admin/staff authentication boundary. Include filters for snapshot and run inspection, return resolved snapshot base metadata, and document the new /api/ entrypoint. Add endpoint tests for authentication, host summaries, snapshot lineage payloads, and run filtering.
This commit is contained in:
156
src/pobsync_backend/api.py
Normal file
156
src/pobsync_backend/api.py
Normal file
@@ -0,0 +1,156 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from django.contrib.admin.views.decorators import staff_member_required
|
||||
from django.db.models import Count
|
||||
from django.http import JsonResponse
|
||||
|
||||
from .models import BackupRun, HostConfig, ScheduleConfig, SnapshotRecord
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def api_index(request) -> JsonResponse:
|
||||
return JsonResponse(
|
||||
{
|
||||
"ok": True,
|
||||
"endpoints": {
|
||||
"hosts": request.build_absolute_uri("/api/hosts/"),
|
||||
"snapshots": request.build_absolute_uri("/api/snapshots/"),
|
||||
"runs": request.build_absolute_uri("/api/runs/"),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def hosts(request) -> JsonResponse:
|
||||
host_qs = (
|
||||
HostConfig.objects.annotate(snapshot_count=Count("snapshots", distinct=True), run_count=Count("runs", distinct=True))
|
||||
.select_related("schedule")
|
||||
.order_by("host")
|
||||
)
|
||||
return JsonResponse({"ok": True, "hosts": [_host_payload(host) for host in host_qs]})
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def snapshots(request) -> JsonResponse:
|
||||
snapshot_qs = SnapshotRecord.objects.select_related("host", "base").order_by("host__host", "-started_at", "dirname")
|
||||
host_filter = request.GET.get("host")
|
||||
kind_filter = request.GET.get("kind")
|
||||
if host_filter:
|
||||
snapshot_qs = snapshot_qs.filter(host__host=host_filter)
|
||||
if kind_filter:
|
||||
snapshot_qs = snapshot_qs.filter(kind=kind_filter)
|
||||
limit = _limit_from_request(request)
|
||||
return JsonResponse({"ok": True, "snapshots": [_snapshot_payload(snapshot) for snapshot in snapshot_qs[:limit]]})
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def runs(request) -> JsonResponse:
|
||||
run_qs = BackupRun.objects.select_related("host", "snapshot").order_by("-created_at")
|
||||
host_filter = request.GET.get("host")
|
||||
status_filter = request.GET.get("status")
|
||||
if host_filter:
|
||||
run_qs = run_qs.filter(host__host=host_filter)
|
||||
if status_filter:
|
||||
run_qs = run_qs.filter(status=status_filter)
|
||||
limit = _limit_from_request(request)
|
||||
return JsonResponse({"ok": True, "runs": [_run_payload(run) for run in run_qs[:limit]]})
|
||||
|
||||
|
||||
def _host_payload(host: HostConfig) -> dict[str, Any]:
|
||||
try:
|
||||
schedule = host.schedule
|
||||
except ScheduleConfig.DoesNotExist:
|
||||
schedule = None
|
||||
return {
|
||||
"host": host.host,
|
||||
"address": host.address,
|
||||
"enabled": host.enabled,
|
||||
"snapshot_count": host.snapshot_count,
|
||||
"run_count": host.run_count,
|
||||
"retention": {
|
||||
"daily": host.retention_daily,
|
||||
"weekly": host.retention_weekly,
|
||||
"monthly": host.retention_monthly,
|
||||
"yearly": host.retention_yearly,
|
||||
},
|
||||
"schedule": None
|
||||
if schedule is None
|
||||
else {
|
||||
"cron_expr": schedule.cron_expr,
|
||||
"enabled": schedule.enabled,
|
||||
"prune": schedule.prune,
|
||||
"last_status": schedule.last_status,
|
||||
"last_started_at": _iso(schedule.last_started_at),
|
||||
"last_finished_at": _iso(schedule.last_finished_at),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _snapshot_payload(snapshot: SnapshotRecord) -> dict[str, Any]:
|
||||
return {
|
||||
"host": snapshot.host.host,
|
||||
"kind": snapshot.kind,
|
||||
"dirname": snapshot.dirname,
|
||||
"path": snapshot.path,
|
||||
"status": snapshot.status,
|
||||
"started_at": _iso(snapshot.started_at),
|
||||
"ended_at": _iso(snapshot.ended_at),
|
||||
"discovered_at": _iso(snapshot.discovered_at),
|
||||
"base": _base_payload(snapshot),
|
||||
}
|
||||
|
||||
|
||||
def _base_payload(snapshot: SnapshotRecord) -> dict[str, Any] | None:
|
||||
if snapshot.base is not None:
|
||||
return {
|
||||
"kind": snapshot.base.kind,
|
||||
"dirname": snapshot.base.dirname,
|
||||
"path": snapshot.base.path,
|
||||
"resolved": True,
|
||||
}
|
||||
if snapshot.base_kind and snapshot.base_dirname:
|
||||
return {
|
||||
"kind": snapshot.base_kind,
|
||||
"dirname": snapshot.base_dirname,
|
||||
"path": snapshot.base_path,
|
||||
"snapshot_id": snapshot.base_snapshot_id,
|
||||
"resolved": False,
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
def _run_payload(run: BackupRun) -> dict[str, Any]:
|
||||
return {
|
||||
"id": run.pk,
|
||||
"host": run.host.host,
|
||||
"run_type": run.run_type,
|
||||
"status": run.status,
|
||||
"started_at": _iso(run.started_at),
|
||||
"ended_at": _iso(run.ended_at),
|
||||
"snapshot": None
|
||||
if run.snapshot is None
|
||||
else {
|
||||
"kind": run.snapshot.kind,
|
||||
"dirname": run.snapshot.dirname,
|
||||
"path": run.snapshot.path,
|
||||
},
|
||||
"snapshot_path": run.snapshot_path,
|
||||
"base_path": run.base_path,
|
||||
"rsync_exit_code": run.rsync_exit_code,
|
||||
}
|
||||
|
||||
|
||||
def _limit_from_request(request, *, default: int = 100, maximum: int = 500) -> int:
|
||||
value = request.GET.get("limit", str(default))
|
||||
try:
|
||||
limit = int(value)
|
||||
except ValueError:
|
||||
return default
|
||||
return max(1, min(limit, maximum))
|
||||
|
||||
|
||||
def _iso(value) -> str | None:
|
||||
return value.isoformat() if value is not None else None
|
||||
Reference in New Issue
Block a user