Files
pobsync/src/pobsync_backend/admin.py
Peter van Arkel df3dcc47c9 (feature) Generate filesystem-backed SSH credentials
Add filesystem-backed SSH credentials for the native systemd deployment
path. Generated keys are stored below POBSYNC_HOME with 0600
permissions, while Django keeps the public key, fingerprint, path, and
selection metadata.

Add a Django SSH key generation view, delete action for unused generated
keys, and a management command used by the installer to ensure a default
backup key exists.

Update runtime config to use generated key paths directly as IdentityFile,
extend host checks to verify key readability, and keep legacy uploaded
keys available for compatibility.
2026-05-19 19:41:40 +02:00

188 lines
7.7 KiB
Python

from __future__ import annotations
from django.contrib import admin
from django.db.models import Count
from django.urls import reverse
from django.utils.html import format_html
from django.utils.http import urlencode
from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential
@admin.register(SshCredential)
class SshCredentialAdmin(admin.ModelAdmin):
list_display = ("name", "key_type", "generated", "has_public_key", "has_known_hosts", "updated_at")
readonly_fields = ("created_at", "updated_at", "fingerprint")
search_fields = ("name", "notes")
fieldsets = (
(None, {"fields": ("name", "key_type", "generated", "key_path", "fingerprint")}),
("Key material", {"fields": ("private_key", "public_key", "known_hosts", "notes")}),
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
)
@admin.display(boolean=True, description="Public key")
def has_public_key(self, obj: SshCredential) -> bool:
return bool(obj.public_key.strip())
@admin.display(boolean=True, description="Known hosts")
def has_known_hosts(self, obj: SshCredential) -> bool:
return bool(obj.known_hosts.strip())
@admin.register(GlobalConfig)
class GlobalConfigAdmin(admin.ModelAdmin):
list_display = ("name", "backup_root", "ssh_user", "ssh_port", "updated_at")
readonly_fields = ("created_at", "updated_at")
fieldsets = (
(None, {"fields": ("name", "backup_root", "pobsync_home")}),
("SSH", {"fields": ("default_ssh_credential", "ssh_user", "ssh_port", "ssh_options")}),
(
"Rsync",
{
"fields": (
"rsync_binary",
"rsync_args",
"rsync_extra_args",
"rsync_timeout_seconds",
"rsync_bwlimit_kbps",
)
},
),
("Defaults", {"fields": ("default_source_root", "default_destination_subdir", "excludes_default")}),
("Retention defaults", {"fields": ("retention_daily", "retention_weekly", "retention_monthly", "retention_yearly")}),
("Legacy JSON", {"fields": ("data",), "classes": ("collapse",)}),
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
)
@admin.register(HostConfig)
class HostConfigAdmin(admin.ModelAdmin):
list_display = (
"host",
"address",
"enabled",
"schedule_state",
"snapshot_count_link",
"backup_run_count_link",
"latest_run_state",
"updated_at",
)
list_filter = ("enabled",)
search_fields = ("host", "address")
readonly_fields = ("created_at", "updated_at")
fieldsets = (
(None, {"fields": ("host", "address", "enabled")}),
("SSH override", {"fields": ("ssh_credential", "ssh_user", "ssh_port")}),
("Source", {"fields": ("source_root", "includes", "excludes_add", "excludes_replace")}),
("Rsync override", {"fields": ("rsync_extra_args",)}),
("Retention", {"fields": ("retention_daily", "retention_weekly", "retention_monthly", "retention_yearly")}),
("Legacy JSON", {"fields": ("config",), "classes": ("collapse",)}),
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
)
def get_queryset(self, request):
return super().get_queryset(request).annotate(
snapshot_count=Count("snapshots", distinct=True),
backup_run_count=Count("runs", distinct=True),
)
@admin.display(description="Schedule")
def schedule_state(self, obj: HostConfig) -> str:
try:
schedule = obj.schedule
except ScheduleConfig.DoesNotExist:
return "none"
if not schedule.enabled:
return "disabled"
return schedule.cron_expr
@admin.display(description="Snapshots", ordering="snapshot_count")
def snapshot_count_link(self, obj: HostConfig) -> str:
count = getattr(obj, "snapshot_count", None)
if count is None:
count = obj.snapshots.count()
url = _admin_changelist_url("pobsync_backend", "snapshotrecord", {"host__id__exact": obj.pk})
return format_html('<a href="{}">{}</a>', url, count)
@admin.display(description="Runs", ordering="backup_run_count")
def backup_run_count_link(self, obj: HostConfig) -> str:
count = getattr(obj, "backup_run_count", None)
if count is None:
count = obj.runs.count()
url = _admin_changelist_url("pobsync_backend", "backuprun", {"host__id__exact": obj.pk})
return format_html('<a href="{}">{}</a>', url, count)
@admin.display(description="Latest run")
def latest_run_state(self, obj: HostConfig) -> str:
latest = obj.runs.order_by("-created_at").first()
if latest is None:
return "none"
return f"{latest.status} {latest.started_at:%Y-%m-%d %H:%M}" if latest.started_at else latest.status
@admin.register(BackupRun)
class BackupRunAdmin(admin.ModelAdmin):
list_display = ("host", "run_type", "status", "started_at", "ended_at", "snapshot_link")
list_filter = ("run_type", "status", "started_at")
search_fields = ("host__host", "snapshot_path", "snapshot__dirname", "snapshot__path")
autocomplete_fields = ("snapshot",)
list_select_related = ("host", "snapshot")
date_hierarchy = "started_at"
@admin.display(description="Snapshot", ordering="snapshot__dirname")
def snapshot_link(self, obj: BackupRun) -> str:
if obj.snapshot is None:
return ""
url = reverse("admin:pobsync_backend_snapshotrecord_change", args=[obj.snapshot.pk])
return format_html('<a href="{}">{}</a>', url, obj.snapshot.dirname)
@admin.register(SnapshotRecord)
class SnapshotRecordAdmin(admin.ModelAdmin):
list_display = ("host", "kind", "dirname", "status", "base_link", "backup_run_count_link", "started_at", "discovered_at")
list_filter = ("kind", "status", "base_kind", "started_at", "discovered_at")
search_fields = (
"host__host",
"dirname",
"path",
"base__dirname",
"base_path",
"base_snapshot_id",
)
autocomplete_fields = ("base",)
list_select_related = ("host", "base")
readonly_fields = ("discovered_at",)
def get_queryset(self, request):
return super().get_queryset(request).annotate(backup_run_count=Count("backup_runs"))
@admin.display(description="Base", ordering="base__dirname")
def base_link(self, obj: SnapshotRecord) -> str:
if obj.base is not None:
url = reverse("admin:pobsync_backend_snapshotrecord_change", args=[obj.base.pk])
return format_html('<a href="{}">{}</a>', url, obj.base.dirname)
if obj.base_dirname:
return obj.base_dirname
return ""
@admin.display(description="Runs", ordering="backup_run_count")
def backup_run_count_link(self, obj: SnapshotRecord) -> str:
count = getattr(obj, "backup_run_count", None)
if count is None:
count = obj.backup_runs.count()
url = _admin_changelist_url("pobsync_backend", "backuprun", {"snapshot__id__exact": obj.pk})
return format_html('<a href="{}">{}</a>', url, count)
@admin.register(ScheduleConfig)
class ScheduleConfigAdmin(admin.ModelAdmin):
list_display = ("host", "cron_expr", "enabled", "prune", "last_status", "last_started_at", "updated_at")
list_filter = ("enabled", "prune", "last_status")
search_fields = ("host__host", "cron_expr")
list_select_related = ("host",)
def _admin_changelist_url(app_label: str, model_name: str, params: dict[str, object]) -> str:
base_url = reverse(f"admin:{app_label}_{model_name}_changelist")
return f"{base_url}?{urlencode(params)}"