Drop the unused GlobalConfig.data field and remove the remaining YAML config path helpers from PobsyncPaths. Keep HostConfig.config as runtime state for preflight data, and relabel it in the admin so it no longer reads as legacy compatibility storage.
187 lines
7.6 KiB
Python
187 lines
7.6 KiB
Python
from __future__ import annotations
|
|
|
|
from django.contrib import admin
|
|
from django.db.models import Count
|
|
from django.urls import reverse
|
|
from django.utils.html import format_html
|
|
from django.utils.http import urlencode
|
|
|
|
from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, SnapshotRecord, SshCredential
|
|
|
|
|
|
@admin.register(SshCredential)
|
|
class SshCredentialAdmin(admin.ModelAdmin):
|
|
list_display = ("name", "key_type", "generated", "has_public_key", "has_known_hosts", "updated_at")
|
|
readonly_fields = ("created_at", "updated_at", "fingerprint")
|
|
search_fields = ("name", "notes")
|
|
fieldsets = (
|
|
(None, {"fields": ("name", "key_type", "generated", "key_path", "fingerprint")}),
|
|
("Key material", {"fields": ("private_key", "public_key", "known_hosts", "notes")}),
|
|
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
|
)
|
|
|
|
@admin.display(boolean=True, description="Public key")
|
|
def has_public_key(self, obj: SshCredential) -> bool:
|
|
return bool(obj.public_key.strip())
|
|
|
|
@admin.display(boolean=True, description="Known hosts")
|
|
def has_known_hosts(self, obj: SshCredential) -> bool:
|
|
return bool(obj.known_hosts.strip())
|
|
|
|
|
|
@admin.register(GlobalConfig)
|
|
class GlobalConfigAdmin(admin.ModelAdmin):
|
|
list_display = ("name", "backup_root", "ssh_user", "ssh_port", "updated_at")
|
|
readonly_fields = ("created_at", "updated_at")
|
|
fieldsets = (
|
|
(None, {"fields": ("name", "backup_root")}),
|
|
("SSH", {"fields": ("default_ssh_credential", "ssh_user", "ssh_port", "ssh_options")}),
|
|
(
|
|
"Rsync",
|
|
{
|
|
"fields": (
|
|
"rsync_binary",
|
|
"rsync_args",
|
|
"rsync_extra_args",
|
|
"rsync_timeout_seconds",
|
|
"rsync_bwlimit_kbps",
|
|
)
|
|
},
|
|
),
|
|
("Defaults", {"fields": ("default_source_root", "default_destination_subdir", "excludes_default")}),
|
|
("Retention defaults", {"fields": ("retention_daily", "retention_weekly", "retention_monthly", "retention_yearly")}),
|
|
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
|
)
|
|
|
|
|
|
@admin.register(HostConfig)
|
|
class HostConfigAdmin(admin.ModelAdmin):
|
|
list_display = (
|
|
"host",
|
|
"address",
|
|
"enabled",
|
|
"schedule_state",
|
|
"snapshot_count_link",
|
|
"backup_run_count_link",
|
|
"latest_run_state",
|
|
"updated_at",
|
|
)
|
|
list_filter = ("enabled",)
|
|
search_fields = ("host", "address")
|
|
readonly_fields = ("created_at", "updated_at")
|
|
fieldsets = (
|
|
(None, {"fields": ("host", "address", "enabled")}),
|
|
("SSH override", {"fields": ("ssh_credential", "ssh_user", "ssh_port")}),
|
|
("Source", {"fields": ("source_root", "includes", "excludes_add", "excludes_replace")}),
|
|
("Rsync override", {"fields": ("rsync_extra_args",)}),
|
|
("Retention", {"fields": ("retention_daily", "retention_weekly", "retention_monthly", "retention_yearly")}),
|
|
("Runtime state", {"fields": ("config",), "classes": ("collapse",)}),
|
|
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
|
)
|
|
|
|
def get_queryset(self, request):
|
|
return super().get_queryset(request).annotate(
|
|
snapshot_count=Count("snapshots", distinct=True),
|
|
backup_run_count=Count("runs", distinct=True),
|
|
)
|
|
|
|
@admin.display(description="Schedule")
|
|
def schedule_state(self, obj: HostConfig) -> str:
|
|
try:
|
|
schedule = obj.schedule
|
|
except ScheduleConfig.DoesNotExist:
|
|
return "none"
|
|
if not schedule.enabled:
|
|
return "disabled"
|
|
return schedule.cron_expr
|
|
|
|
@admin.display(description="Snapshots", ordering="snapshot_count")
|
|
def snapshot_count_link(self, obj: HostConfig) -> str:
|
|
count = getattr(obj, "snapshot_count", None)
|
|
if count is None:
|
|
count = obj.snapshots.count()
|
|
url = _admin_changelist_url("pobsync_backend", "snapshotrecord", {"host__id__exact": obj.pk})
|
|
return format_html('<a href="{}">{}</a>', url, count)
|
|
|
|
@admin.display(description="Runs", ordering="backup_run_count")
|
|
def backup_run_count_link(self, obj: HostConfig) -> str:
|
|
count = getattr(obj, "backup_run_count", None)
|
|
if count is None:
|
|
count = obj.runs.count()
|
|
url = _admin_changelist_url("pobsync_backend", "backuprun", {"host__id__exact": obj.pk})
|
|
return format_html('<a href="{}">{}</a>', url, count)
|
|
|
|
@admin.display(description="Latest run")
|
|
def latest_run_state(self, obj: HostConfig) -> str:
|
|
latest = obj.runs.order_by("-created_at").first()
|
|
if latest is None:
|
|
return "none"
|
|
return f"{latest.status} {latest.started_at:%Y-%m-%d %H:%M}" if latest.started_at else latest.status
|
|
|
|
|
|
@admin.register(BackupRun)
|
|
class BackupRunAdmin(admin.ModelAdmin):
|
|
list_display = ("host", "run_type", "status", "started_at", "ended_at", "snapshot_link")
|
|
list_filter = ("run_type", "status", "started_at")
|
|
search_fields = ("host__host", "snapshot_path", "snapshot__dirname", "snapshot__path")
|
|
autocomplete_fields = ("snapshot",)
|
|
list_select_related = ("host", "snapshot")
|
|
date_hierarchy = "started_at"
|
|
|
|
@admin.display(description="Snapshot", ordering="snapshot__dirname")
|
|
def snapshot_link(self, obj: BackupRun) -> str:
|
|
if obj.snapshot is None:
|
|
return ""
|
|
url = reverse("admin:pobsync_backend_snapshotrecord_change", args=[obj.snapshot.pk])
|
|
return format_html('<a href="{}">{}</a>', url, obj.snapshot.dirname)
|
|
|
|
|
|
@admin.register(SnapshotRecord)
|
|
class SnapshotRecordAdmin(admin.ModelAdmin):
|
|
list_display = ("host", "kind", "dirname", "status", "base_link", "backup_run_count_link", "started_at", "discovered_at")
|
|
list_filter = ("kind", "status", "base_kind", "started_at", "discovered_at")
|
|
search_fields = (
|
|
"host__host",
|
|
"dirname",
|
|
"path",
|
|
"base__dirname",
|
|
"base_path",
|
|
"base_snapshot_id",
|
|
)
|
|
autocomplete_fields = ("base",)
|
|
list_select_related = ("host", "base")
|
|
readonly_fields = ("discovered_at",)
|
|
|
|
def get_queryset(self, request):
|
|
return super().get_queryset(request).annotate(backup_run_count=Count("backup_runs"))
|
|
|
|
@admin.display(description="Base", ordering="base__dirname")
|
|
def base_link(self, obj: SnapshotRecord) -> str:
|
|
if obj.base is not None:
|
|
url = reverse("admin:pobsync_backend_snapshotrecord_change", args=[obj.base.pk])
|
|
return format_html('<a href="{}">{}</a>', url, obj.base.dirname)
|
|
if obj.base_dirname:
|
|
return obj.base_dirname
|
|
return ""
|
|
|
|
@admin.display(description="Runs", ordering="backup_run_count")
|
|
def backup_run_count_link(self, obj: SnapshotRecord) -> str:
|
|
count = getattr(obj, "backup_run_count", None)
|
|
if count is None:
|
|
count = obj.backup_runs.count()
|
|
url = _admin_changelist_url("pobsync_backend", "backuprun", {"snapshot__id__exact": obj.pk})
|
|
return format_html('<a href="{}">{}</a>', url, count)
|
|
|
|
|
|
@admin.register(ScheduleConfig)
|
|
class ScheduleConfigAdmin(admin.ModelAdmin):
|
|
list_display = ("host", "cron_expr", "enabled", "prune", "last_status", "last_started_at", "updated_at")
|
|
list_filter = ("enabled", "prune", "last_status")
|
|
search_fields = ("host__host", "cron_expr")
|
|
list_select_related = ("host",)
|
|
|
|
|
|
def _admin_changelist_url(app_label: str, model_name: str, params: dict[str, object]) -> str:
|
|
base_url = reverse(f"admin:{app_label}_{model_name}_changelist")
|
|
return f"{base_url}?{urlencode(params)}"
|