Files
pobsync/src/pobsync_backend/api.py
Peter van Arkel 2778a589ea (feature) Add staff-only service status API
Add /api/status/ for quick inspection of database backend, object counts, latest
backup run, and latest scheduler activity. Link it from the API index and reuse
schedule serialization between host summaries and status output.

Cover the endpoint with a focused API test and document the new status URL.
2026-05-19 11:46:22 +02:00

195 lines
6.6 KiB
Python

from __future__ import annotations
from typing import Any
from django.contrib.admin.views.decorators import staff_member_required
from django.db import connection
from django.db.models import Count
from django.http import JsonResponse
from django.utils import timezone
from .models import BackupRun, HostConfig, ScheduleConfig, SnapshotRecord
@staff_member_required
def api_index(request) -> JsonResponse:
return JsonResponse(
{
"ok": True,
"endpoints": {
"status": request.build_absolute_uri("/api/status/"),
"hosts": request.build_absolute_uri("/api/hosts/"),
"snapshots": request.build_absolute_uri("/api/snapshots/"),
"runs": request.build_absolute_uri("/api/runs/"),
},
}
)
@staff_member_required
def status(request) -> JsonResponse:
latest_run = BackupRun.objects.select_related("host", "snapshot").order_by("-created_at").first()
latest_schedule = ScheduleConfig.objects.select_related("host").order_by("-last_started_at", "-updated_at").first()
return JsonResponse(
{
"ok": True,
"generated_at": timezone.now().isoformat(),
"database": {
"vendor": connection.vendor,
"engine": connection.settings_dict["ENGINE"],
},
"counts": {
"hosts": HostConfig.objects.count(),
"enabled_hosts": HostConfig.objects.filter(enabled=True).count(),
"schedules": ScheduleConfig.objects.count(),
"enabled_schedules": ScheduleConfig.objects.filter(enabled=True).count(),
"snapshots": SnapshotRecord.objects.count(),
"runs": BackupRun.objects.count(),
"running_runs": BackupRun.objects.filter(status=BackupRun.Status.RUNNING).count(),
"failed_runs": BackupRun.objects.filter(status=BackupRun.Status.FAILED).count(),
},
"latest_run": None if latest_run is None else _run_payload(latest_run),
"latest_schedule": None if latest_schedule is None else _schedule_payload(latest_schedule),
}
)
@staff_member_required
def hosts(request) -> JsonResponse:
host_qs = (
HostConfig.objects.annotate(snapshot_count=Count("snapshots", distinct=True), run_count=Count("runs", distinct=True))
.select_related("schedule")
.order_by("host")
)
return JsonResponse({"ok": True, "hosts": [_host_payload(host) for host in host_qs]})
@staff_member_required
def snapshots(request) -> JsonResponse:
snapshot_qs = SnapshotRecord.objects.select_related("host", "base").order_by("host__host", "-started_at", "dirname")
host_filter = request.GET.get("host")
kind_filter = request.GET.get("kind")
if host_filter:
snapshot_qs = snapshot_qs.filter(host__host=host_filter)
if kind_filter:
snapshot_qs = snapshot_qs.filter(kind=kind_filter)
limit = _limit_from_request(request)
return JsonResponse({"ok": True, "snapshots": [_snapshot_payload(snapshot) for snapshot in snapshot_qs[:limit]]})
@staff_member_required
def runs(request) -> JsonResponse:
run_qs = BackupRun.objects.select_related("host", "snapshot").order_by("-created_at")
host_filter = request.GET.get("host")
status_filter = request.GET.get("status")
if host_filter:
run_qs = run_qs.filter(host__host=host_filter)
if status_filter:
run_qs = run_qs.filter(status=status_filter)
limit = _limit_from_request(request)
return JsonResponse({"ok": True, "runs": [_run_payload(run) for run in run_qs[:limit]]})
def _host_payload(host: HostConfig) -> dict[str, Any]:
try:
schedule = host.schedule
except ScheduleConfig.DoesNotExist:
schedule = None
return {
"host": host.host,
"address": host.address,
"enabled": host.enabled,
"snapshot_count": host.snapshot_count,
"run_count": host.run_count,
"retention": {
"daily": host.retention_daily,
"weekly": host.retention_weekly,
"monthly": host.retention_monthly,
"yearly": host.retention_yearly,
},
"schedule": None
if schedule is None
else _schedule_payload(schedule),
}
def _snapshot_payload(snapshot: SnapshotRecord) -> dict[str, Any]:
return {
"host": snapshot.host.host,
"kind": snapshot.kind,
"dirname": snapshot.dirname,
"path": snapshot.path,
"status": snapshot.status,
"started_at": _iso(snapshot.started_at),
"ended_at": _iso(snapshot.ended_at),
"discovered_at": _iso(snapshot.discovered_at),
"base": _base_payload(snapshot),
}
def _base_payload(snapshot: SnapshotRecord) -> dict[str, Any] | None:
if snapshot.base is not None:
return {
"kind": snapshot.base.kind,
"dirname": snapshot.base.dirname,
"path": snapshot.base.path,
"resolved": True,
}
if snapshot.base_kind and snapshot.base_dirname:
return {
"kind": snapshot.base_kind,
"dirname": snapshot.base_dirname,
"path": snapshot.base_path,
"snapshot_id": snapshot.base_snapshot_id,
"resolved": False,
}
return None
def _run_payload(run: BackupRun) -> dict[str, Any]:
return {
"id": run.pk,
"host": run.host.host,
"run_type": run.run_type,
"status": run.status,
"started_at": _iso(run.started_at),
"ended_at": _iso(run.ended_at),
"snapshot": None
if run.snapshot is None
else {
"kind": run.snapshot.kind,
"dirname": run.snapshot.dirname,
"path": run.snapshot.path,
},
"snapshot_path": run.snapshot_path,
"base_path": run.base_path,
"rsync_exit_code": run.rsync_exit_code,
}
def _schedule_payload(schedule: ScheduleConfig) -> dict[str, Any]:
return {
"host": schedule.host.host,
"cron_expr": schedule.cron_expr,
"enabled": schedule.enabled,
"prune": schedule.prune,
"last_due_key": schedule.last_due_key,
"last_status": schedule.last_status,
"last_started_at": _iso(schedule.last_started_at),
"last_finished_at": _iso(schedule.last_finished_at),
}
def _limit_from_request(request, *, default: int = 100, maximum: int = 500) -> int:
value = request.GET.get("limit", str(default))
try:
limit = int(value)
except ValueError:
return default
return max(1, min(limit, maximum))
def _iso(value) -> str | None:
return value.isoformat() if value is not None else None