2 Commits

Author SHA1 Message Date
2778a589ea (feature) Add staff-only service status API
Add /api/status/ for quick inspection of database backend, object counts, latest
backup run, and latest scheduler activity. Link it from the API index and reuse
schedule serialization between host summaries and status output.

Cover the endpoint with a focused API test and document the new status URL.
2026-05-19 11:46:22 +02:00
ccd89119da (feature) Add staff-only JSON inspection API
Expose lightweight Django JSON endpoints for hosts, snapshots, and backup runs
using the existing admin/staff authentication boundary. Include filters for
snapshot and run inspection, return resolved snapshot base metadata, and document
the new /api/ entrypoint.

Add endpoint tests for authentication, host summaries, snapshot lineage payloads,
and run filtering.
2026-05-19 11:43:50 +02:00
4 changed files with 350 additions and 1 deletions

View File

@@ -35,6 +35,11 @@ The admin is available at:
- http://127.0.0.1:8000/admin/
Staff-only JSON endpoints are available at:
- http://127.0.0.1:8000/api/
- http://127.0.0.1:8000/api/status/
## SQL-First Setup
Create global config:
@@ -112,6 +117,8 @@ docker compose up --build web
This starts Django on:
- http://127.0.0.1:8010/admin/
- http://127.0.0.1:8010/api/
- http://127.0.0.1:8010/api/status/
Run the scheduler alongside the web admin:
@@ -142,6 +149,7 @@ Discovered snapshots are stored in `SnapshotRecord`, including the base snapshot
base record when it is known.
The Django retention command plans from `SnapshotRecord` instead of rediscovering snapshots from the filesystem.
Post-backup pruning from Django also uses the SQL retention service after the completed snapshot is recorded.
Staff-only JSON endpoints expose service status, hosts, snapshots, and backup runs for lightweight inspection.
The remaining internal engine code still contains reusable backup primitives:
@@ -152,7 +160,6 @@ The remaining internal engine code still contains reusable backup primitives:
Next refactor targets:
- Surface `SnapshotRecord` data through API/admin views instead of filesystem inspection.
- Move more snapshot lifecycle details into typed domain objects.
- Replace remaining dictionary-shaped config at engine boundaries.
- Remove legacy YAML import/export once production migration no longer needs it.

194
src/pobsync_backend/api.py Normal file
View File

@@ -0,0 +1,194 @@
from __future__ import annotations
from typing import Any
from django.contrib.admin.views.decorators import staff_member_required
from django.db import connection
from django.db.models import Count
from django.http import JsonResponse
from django.utils import timezone
from .models import BackupRun, HostConfig, ScheduleConfig, SnapshotRecord
@staff_member_required
def api_index(request) -> JsonResponse:
return JsonResponse(
{
"ok": True,
"endpoints": {
"status": request.build_absolute_uri("/api/status/"),
"hosts": request.build_absolute_uri("/api/hosts/"),
"snapshots": request.build_absolute_uri("/api/snapshots/"),
"runs": request.build_absolute_uri("/api/runs/"),
},
}
)
@staff_member_required
def status(request) -> JsonResponse:
latest_run = BackupRun.objects.select_related("host", "snapshot").order_by("-created_at").first()
latest_schedule = ScheduleConfig.objects.select_related("host").order_by("-last_started_at", "-updated_at").first()
return JsonResponse(
{
"ok": True,
"generated_at": timezone.now().isoformat(),
"database": {
"vendor": connection.vendor,
"engine": connection.settings_dict["ENGINE"],
},
"counts": {
"hosts": HostConfig.objects.count(),
"enabled_hosts": HostConfig.objects.filter(enabled=True).count(),
"schedules": ScheduleConfig.objects.count(),
"enabled_schedules": ScheduleConfig.objects.filter(enabled=True).count(),
"snapshots": SnapshotRecord.objects.count(),
"runs": BackupRun.objects.count(),
"running_runs": BackupRun.objects.filter(status=BackupRun.Status.RUNNING).count(),
"failed_runs": BackupRun.objects.filter(status=BackupRun.Status.FAILED).count(),
},
"latest_run": None if latest_run is None else _run_payload(latest_run),
"latest_schedule": None if latest_schedule is None else _schedule_payload(latest_schedule),
}
)
@staff_member_required
def hosts(request) -> JsonResponse:
host_qs = (
HostConfig.objects.annotate(snapshot_count=Count("snapshots", distinct=True), run_count=Count("runs", distinct=True))
.select_related("schedule")
.order_by("host")
)
return JsonResponse({"ok": True, "hosts": [_host_payload(host) for host in host_qs]})
@staff_member_required
def snapshots(request) -> JsonResponse:
snapshot_qs = SnapshotRecord.objects.select_related("host", "base").order_by("host__host", "-started_at", "dirname")
host_filter = request.GET.get("host")
kind_filter = request.GET.get("kind")
if host_filter:
snapshot_qs = snapshot_qs.filter(host__host=host_filter)
if kind_filter:
snapshot_qs = snapshot_qs.filter(kind=kind_filter)
limit = _limit_from_request(request)
return JsonResponse({"ok": True, "snapshots": [_snapshot_payload(snapshot) for snapshot in snapshot_qs[:limit]]})
@staff_member_required
def runs(request) -> JsonResponse:
run_qs = BackupRun.objects.select_related("host", "snapshot").order_by("-created_at")
host_filter = request.GET.get("host")
status_filter = request.GET.get("status")
if host_filter:
run_qs = run_qs.filter(host__host=host_filter)
if status_filter:
run_qs = run_qs.filter(status=status_filter)
limit = _limit_from_request(request)
return JsonResponse({"ok": True, "runs": [_run_payload(run) for run in run_qs[:limit]]})
def _host_payload(host: HostConfig) -> dict[str, Any]:
try:
schedule = host.schedule
except ScheduleConfig.DoesNotExist:
schedule = None
return {
"host": host.host,
"address": host.address,
"enabled": host.enabled,
"snapshot_count": host.snapshot_count,
"run_count": host.run_count,
"retention": {
"daily": host.retention_daily,
"weekly": host.retention_weekly,
"monthly": host.retention_monthly,
"yearly": host.retention_yearly,
},
"schedule": None
if schedule is None
else _schedule_payload(schedule),
}
def _snapshot_payload(snapshot: SnapshotRecord) -> dict[str, Any]:
return {
"host": snapshot.host.host,
"kind": snapshot.kind,
"dirname": snapshot.dirname,
"path": snapshot.path,
"status": snapshot.status,
"started_at": _iso(snapshot.started_at),
"ended_at": _iso(snapshot.ended_at),
"discovered_at": _iso(snapshot.discovered_at),
"base": _base_payload(snapshot),
}
def _base_payload(snapshot: SnapshotRecord) -> dict[str, Any] | None:
if snapshot.base is not None:
return {
"kind": snapshot.base.kind,
"dirname": snapshot.base.dirname,
"path": snapshot.base.path,
"resolved": True,
}
if snapshot.base_kind and snapshot.base_dirname:
return {
"kind": snapshot.base_kind,
"dirname": snapshot.base_dirname,
"path": snapshot.base_path,
"snapshot_id": snapshot.base_snapshot_id,
"resolved": False,
}
return None
def _run_payload(run: BackupRun) -> dict[str, Any]:
return {
"id": run.pk,
"host": run.host.host,
"run_type": run.run_type,
"status": run.status,
"started_at": _iso(run.started_at),
"ended_at": _iso(run.ended_at),
"snapshot": None
if run.snapshot is None
else {
"kind": run.snapshot.kind,
"dirname": run.snapshot.dirname,
"path": run.snapshot.path,
},
"snapshot_path": run.snapshot_path,
"base_path": run.base_path,
"rsync_exit_code": run.rsync_exit_code,
}
def _schedule_payload(schedule: ScheduleConfig) -> dict[str, Any]:
return {
"host": schedule.host.host,
"cron_expr": schedule.cron_expr,
"enabled": schedule.enabled,
"prune": schedule.prune,
"last_due_key": schedule.last_due_key,
"last_status": schedule.last_status,
"last_started_at": _iso(schedule.last_started_at),
"last_finished_at": _iso(schedule.last_finished_at),
}
def _limit_from_request(request, *, default: int = 100, maximum: int = 500) -> int:
value = request.GET.get("limit", str(default))
try:
limit = int(value)
except ValueError:
return default
return max(1, min(limit, maximum))
def _iso(value) -> str | None:
return value.isoformat() if value is not None else None

View File

@@ -0,0 +1,141 @@
from __future__ import annotations
from datetime import datetime, timezone
from django.contrib.auth import get_user_model
from django.test import TestCase
from django.utils import timezone as django_timezone
from pobsync_backend.models import BackupRun, HostConfig, ScheduleConfig, SnapshotRecord
class ApiTests(TestCase):
def setUp(self) -> None:
user_model = get_user_model()
self.staff_user = user_model.objects.create_user(
username="admin",
password="secret",
is_staff=True,
is_superuser=True,
)
def test_api_requires_staff_login(self) -> None:
response = self.client.get("/api/hosts/")
self.assertEqual(response.status_code, 302)
self.assertIn("/admin/login/", response["Location"])
def test_hosts_endpoint_returns_counts_and_schedule(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
ScheduleConfig.objects.create(host=host, cron_expr="15 2 * * *", enabled=True, prune=True)
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot)
response = self.client.get("/api/hosts/")
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertTrue(payload["ok"])
self.assertEqual(payload["hosts"][0]["host"], "web-01")
self.assertEqual(payload["hosts"][0]["snapshot_count"], 1)
self.assertEqual(payload["hosts"][0]["run_count"], 1)
self.assertEqual(payload["hosts"][0]["schedule"]["cron_expr"], "15 2 * * *")
self.assertTrue(payload["hosts"][0]["schedule"]["prune"])
def test_snapshots_endpoint_filters_and_returns_base_payload(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
other_host = HostConfig.objects.create(host="db-01", address="db-01.example.test")
base = self._snapshot(host, "20260518-021500Z__BASESNAP")
self._snapshot(other_host, "20260519-021500Z__OTHERSNP")
child = self._snapshot(host, "20260519-021500Z__CHILDSNP", base=base)
response = self.client.get("/api/snapshots/", {"host": host.host, "kind": "scheduled"})
self.assertEqual(response.status_code, 200)
snapshots = response.json()["snapshots"]
self.assertEqual([snapshot["dirname"] for snapshot in snapshots], [child.dirname, base.dirname])
self.assertEqual(snapshots[0]["base"]["dirname"], base.dirname)
self.assertTrue(snapshots[0]["base"]["resolved"])
def test_runs_endpoint_filters_by_status_and_limit(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
BackupRun.objects.create(host=host, status=BackupRun.Status.FAILED)
BackupRun.objects.create(host=host, status=BackupRun.Status.SUCCESS, snapshot=snapshot)
response = self.client.get("/api/runs/", {"host": host.host, "status": "success", "limit": "1"})
self.assertEqual(response.status_code, 200)
runs = response.json()["runs"]
self.assertEqual(len(runs), 1)
self.assertEqual(runs[0]["status"], "success")
self.assertEqual(runs[0]["snapshot"]["dirname"], snapshot.dirname)
def test_api_index_lists_endpoints(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.get("/api/")
self.assertEqual(response.status_code, 200)
endpoints = response.json()["endpoints"]
self.assertEqual(endpoints["status"], "http://testserver/api/status/")
self.assertEqual(endpoints["hosts"], "http://testserver/api/hosts/")
self.assertEqual(endpoints["snapshots"], "http://testserver/api/snapshots/")
self.assertEqual(endpoints["runs"], "http://testserver/api/runs/")
def test_status_endpoint_returns_counts_and_latest_activity(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
schedule = ScheduleConfig.objects.create(
host=host,
cron_expr="15 2 * * *",
enabled=True,
prune=True,
last_due_key="202605190215",
last_status="success",
last_started_at=django_timezone.now(),
)
snapshot = self._snapshot(host, "20260519-021500Z__ABCDEFGH")
BackupRun.objects.create(
host=host,
status=BackupRun.Status.SUCCESS,
snapshot=snapshot,
started_at=datetime(2026, 5, 19, 2, 15, tzinfo=timezone.utc),
)
response = self.client.get("/api/status/")
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertTrue(payload["ok"])
self.assertEqual(payload["database"]["vendor"], "sqlite")
self.assertEqual(payload["counts"]["hosts"], 1)
self.assertEqual(payload["counts"]["enabled_hosts"], 1)
self.assertEqual(payload["counts"]["enabled_schedules"], 1)
self.assertEqual(payload["counts"]["snapshots"], 1)
self.assertEqual(payload["counts"]["runs"], 1)
self.assertEqual(payload["latest_run"]["host"], host.host)
self.assertEqual(payload["latest_run"]["snapshot"]["dirname"], snapshot.dirname)
self.assertEqual(payload["latest_schedule"]["host"], host.host)
self.assertEqual(payload["latest_schedule"]["last_due_key"], schedule.last_due_key)
def _snapshot(
self,
host: HostConfig,
dirname: str,
*,
base: SnapshotRecord | None = None,
) -> SnapshotRecord:
started_at = datetime.strptime(dirname.split("__", 1)[0], "%Y%m%d-%H%M%SZ").replace(tzinfo=timezone.utc)
return SnapshotRecord.objects.create(
host=host,
kind=SnapshotRecord.Kind.SCHEDULED,
dirname=dirname,
path=f"/backups/{host.host}/scheduled/{dirname}",
status="success",
started_at=started_at,
base=base,
)

View File

@@ -3,7 +3,14 @@ from __future__ import annotations
from django.contrib import admin
from django.urls import path
from pobsync_backend import api
urlpatterns = [
path("api/", api.api_index),
path("api/status/", api.status),
path("api/hosts/", api.hosts),
path("api/snapshots/", api.snapshots),
path("api/runs/", api.runs),
path("admin/", admin.site.urls),
]