(feature) Add staff-only service status API

Add /api/status/ for quick inspection of database backend, object counts, latest
backup run, and latest scheduler activity. Link it from the API index and reuse
schedule serialization between host summaries and status output.

Cover the endpoint with a focused API test and document the new status URL.
This commit is contained in:
2026-05-19 11:46:22 +02:00
parent ccd89119da
commit 2778a589ea
4 changed files with 88 additions and 9 deletions

View File

@@ -3,8 +3,10 @@ from __future__ import annotations
from typing import Any
from django.contrib.admin.views.decorators import staff_member_required
from django.db import connection
from django.db.models import Count
from django.http import JsonResponse
from django.utils import timezone
from .models import BackupRun, HostConfig, ScheduleConfig, SnapshotRecord
@@ -15,6 +17,7 @@ def api_index(request) -> JsonResponse:
{
"ok": True,
"endpoints": {
"status": request.build_absolute_uri("/api/status/"),
"hosts": request.build_absolute_uri("/api/hosts/"),
"snapshots": request.build_absolute_uri("/api/snapshots/"),
"runs": request.build_absolute_uri("/api/runs/"),
@@ -23,6 +26,35 @@ def api_index(request) -> JsonResponse:
)
@staff_member_required
def status(request) -> JsonResponse:
latest_run = BackupRun.objects.select_related("host", "snapshot").order_by("-created_at").first()
latest_schedule = ScheduleConfig.objects.select_related("host").order_by("-last_started_at", "-updated_at").first()
return JsonResponse(
{
"ok": True,
"generated_at": timezone.now().isoformat(),
"database": {
"vendor": connection.vendor,
"engine": connection.settings_dict["ENGINE"],
},
"counts": {
"hosts": HostConfig.objects.count(),
"enabled_hosts": HostConfig.objects.filter(enabled=True).count(),
"schedules": ScheduleConfig.objects.count(),
"enabled_schedules": ScheduleConfig.objects.filter(enabled=True).count(),
"snapshots": SnapshotRecord.objects.count(),
"runs": BackupRun.objects.count(),
"running_runs": BackupRun.objects.filter(status=BackupRun.Status.RUNNING).count(),
"failed_runs": BackupRun.objects.filter(status=BackupRun.Status.FAILED).count(),
},
"latest_run": None if latest_run is None else _run_payload(latest_run),
"latest_schedule": None if latest_schedule is None else _schedule_payload(latest_schedule),
}
)
@staff_member_required
def hosts(request) -> JsonResponse:
host_qs = (
@@ -78,14 +110,7 @@ def _host_payload(host: HostConfig) -> dict[str, Any]:
},
"schedule": None
if schedule is None
else {
"cron_expr": schedule.cron_expr,
"enabled": schedule.enabled,
"prune": schedule.prune,
"last_status": schedule.last_status,
"last_started_at": _iso(schedule.last_started_at),
"last_finished_at": _iso(schedule.last_finished_at),
},
else _schedule_payload(schedule),
}
@@ -143,6 +168,19 @@ def _run_payload(run: BackupRun) -> dict[str, Any]:
}
def _schedule_payload(schedule: ScheduleConfig) -> dict[str, Any]:
return {
"host": schedule.host.host,
"cron_expr": schedule.cron_expr,
"enabled": schedule.enabled,
"prune": schedule.prune,
"last_due_key": schedule.last_due_key,
"last_status": schedule.last_status,
"last_started_at": _iso(schedule.last_started_at),
"last_finished_at": _iso(schedule.last_finished_at),
}
def _limit_from_request(request, *, default: int = 100, maximum: int = 500) -> int:
value = request.GET.get("limit", str(default))
try: