Record snapshot purge history whenever retention or incomplete cleanup removes snapshot directories and SQL records. Store the purge reason, original kind, path, action source, and triggering operator so manual, scheduled, CLI, and incomplete cleanup actions remain auditable after the original snapshot record is deleted. Add a staff-only Purged Snapshots page with host/action filters and register the audit model in Django admin. Refs #16 Refs #8
197 lines
8.0 KiB
Python
197 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
from django.contrib import admin
|
|
from django.db.models import Count
|
|
from django.urls import reverse
|
|
from django.utils.html import format_html
|
|
from django.utils.http import urlencode
|
|
|
|
from .models import BackupRun, GlobalConfig, HostConfig, PurgedSnapshot, ScheduleConfig, SnapshotRecord, SshCredential
|
|
|
|
|
|
@admin.register(SshCredential)
|
|
class SshCredentialAdmin(admin.ModelAdmin):
|
|
list_display = ("name", "key_type", "generated", "has_public_key", "has_known_hosts", "updated_at")
|
|
readonly_fields = ("created_at", "updated_at", "fingerprint")
|
|
search_fields = ("name", "notes")
|
|
fieldsets = (
|
|
(None, {"fields": ("name", "key_type", "generated", "key_path", "fingerprint")}),
|
|
("Key material", {"fields": ("private_key", "public_key", "known_hosts", "notes")}),
|
|
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
|
)
|
|
|
|
@admin.display(boolean=True, description="Public key")
|
|
def has_public_key(self, obj: SshCredential) -> bool:
|
|
return bool(obj.public_key.strip())
|
|
|
|
@admin.display(boolean=True, description="Known hosts")
|
|
def has_known_hosts(self, obj: SshCredential) -> bool:
|
|
return bool(obj.known_hosts.strip())
|
|
|
|
|
|
@admin.register(GlobalConfig)
|
|
class GlobalConfigAdmin(admin.ModelAdmin):
|
|
list_display = ("name", "backup_root", "ssh_user", "ssh_port", "updated_at")
|
|
readonly_fields = ("created_at", "updated_at")
|
|
fieldsets = (
|
|
(None, {"fields": ("name", "backup_root")}),
|
|
("SSH", {"fields": ("default_ssh_credential", "ssh_user", "ssh_port", "ssh_options")}),
|
|
(
|
|
"Rsync",
|
|
{
|
|
"fields": (
|
|
"rsync_binary",
|
|
"rsync_args",
|
|
"rsync_extra_args",
|
|
"rsync_timeout_seconds",
|
|
"rsync_bwlimit_kbps",
|
|
)
|
|
},
|
|
),
|
|
("Defaults", {"fields": ("default_source_root", "default_destination_subdir", "excludes_default")}),
|
|
("Retention defaults", {"fields": ("retention_daily", "retention_weekly", "retention_monthly", "retention_yearly")}),
|
|
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
|
)
|
|
|
|
|
|
@admin.register(HostConfig)
|
|
class HostConfigAdmin(admin.ModelAdmin):
|
|
list_display = (
|
|
"host",
|
|
"address",
|
|
"enabled",
|
|
"schedule_state",
|
|
"snapshot_count_link",
|
|
"backup_run_count_link",
|
|
"latest_run_state",
|
|
"updated_at",
|
|
)
|
|
list_filter = ("enabled",)
|
|
search_fields = ("host", "address")
|
|
readonly_fields = ("created_at", "updated_at")
|
|
fieldsets = (
|
|
(None, {"fields": ("host", "address", "enabled")}),
|
|
("SSH override", {"fields": ("ssh_credential", "ssh_user", "ssh_port")}),
|
|
("Source", {"fields": ("source_root", "includes", "excludes_add", "excludes_replace")}),
|
|
("Rsync override", {"fields": ("rsync_extra_args",)}),
|
|
("Retention", {"fields": ("retention_daily", "retention_weekly", "retention_monthly", "retention_yearly")}),
|
|
("Runtime state", {"fields": ("config",), "classes": ("collapse",)}),
|
|
("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
|
)
|
|
|
|
def get_queryset(self, request):
|
|
return super().get_queryset(request).annotate(
|
|
snapshot_count=Count("snapshots", distinct=True),
|
|
backup_run_count=Count("runs", distinct=True),
|
|
)
|
|
|
|
@admin.display(description="Schedule")
|
|
def schedule_state(self, obj: HostConfig) -> str:
|
|
try:
|
|
schedule = obj.schedule
|
|
except ScheduleConfig.DoesNotExist:
|
|
return "none"
|
|
if not schedule.enabled:
|
|
return "disabled"
|
|
return schedule.cron_expr
|
|
|
|
@admin.display(description="Snapshots", ordering="snapshot_count")
|
|
def snapshot_count_link(self, obj: HostConfig) -> str:
|
|
count = getattr(obj, "snapshot_count", None)
|
|
if count is None:
|
|
count = obj.snapshots.count()
|
|
url = _admin_changelist_url("pobsync_backend", "snapshotrecord", {"host__id__exact": obj.pk})
|
|
return format_html('<a href="{}">{}</a>', url, count)
|
|
|
|
@admin.display(description="Runs", ordering="backup_run_count")
|
|
def backup_run_count_link(self, obj: HostConfig) -> str:
|
|
count = getattr(obj, "backup_run_count", None)
|
|
if count is None:
|
|
count = obj.runs.count()
|
|
url = _admin_changelist_url("pobsync_backend", "backuprun", {"host__id__exact": obj.pk})
|
|
return format_html('<a href="{}">{}</a>', url, count)
|
|
|
|
@admin.display(description="Latest run")
|
|
def latest_run_state(self, obj: HostConfig) -> str:
|
|
latest = obj.runs.order_by("-created_at").first()
|
|
if latest is None:
|
|
return "none"
|
|
return f"{latest.status} {latest.started_at:%Y-%m-%d %H:%M}" if latest.started_at else latest.status
|
|
|
|
|
|
@admin.register(BackupRun)
|
|
class BackupRunAdmin(admin.ModelAdmin):
|
|
list_display = ("host", "run_type", "status", "started_at", "ended_at", "snapshot_link")
|
|
list_filter = ("run_type", "status", "started_at")
|
|
search_fields = ("host__host", "snapshot_path", "snapshot__dirname", "snapshot__path")
|
|
autocomplete_fields = ("snapshot",)
|
|
list_select_related = ("host", "snapshot")
|
|
date_hierarchy = "started_at"
|
|
|
|
@admin.display(description="Snapshot", ordering="snapshot__dirname")
|
|
def snapshot_link(self, obj: BackupRun) -> str:
|
|
if obj.snapshot is None:
|
|
return ""
|
|
url = reverse("admin:pobsync_backend_snapshotrecord_change", args=[obj.snapshot.pk])
|
|
return format_html('<a href="{}">{}</a>', url, obj.snapshot.dirname)
|
|
|
|
|
|
@admin.register(SnapshotRecord)
|
|
class SnapshotRecordAdmin(admin.ModelAdmin):
|
|
list_display = ("host", "kind", "dirname", "status", "base_link", "backup_run_count_link", "started_at", "discovered_at")
|
|
list_filter = ("kind", "status", "base_kind", "started_at", "discovered_at")
|
|
search_fields = (
|
|
"host__host",
|
|
"dirname",
|
|
"path",
|
|
"base__dirname",
|
|
"base_path",
|
|
"base_snapshot_id",
|
|
)
|
|
autocomplete_fields = ("base",)
|
|
list_select_related = ("host", "base")
|
|
readonly_fields = ("discovered_at",)
|
|
|
|
def get_queryset(self, request):
|
|
return super().get_queryset(request).annotate(backup_run_count=Count("backup_runs"))
|
|
|
|
@admin.display(description="Base", ordering="base__dirname")
|
|
def base_link(self, obj: SnapshotRecord) -> str:
|
|
if obj.base is not None:
|
|
url = reverse("admin:pobsync_backend_snapshotrecord_change", args=[obj.base.pk])
|
|
return format_html('<a href="{}">{}</a>', url, obj.base.dirname)
|
|
if obj.base_dirname:
|
|
return obj.base_dirname
|
|
return ""
|
|
|
|
@admin.display(description="Runs", ordering="backup_run_count")
|
|
def backup_run_count_link(self, obj: SnapshotRecord) -> str:
|
|
count = getattr(obj, "backup_run_count", None)
|
|
if count is None:
|
|
count = obj.backup_runs.count()
|
|
url = _admin_changelist_url("pobsync_backend", "backuprun", {"snapshot__id__exact": obj.pk})
|
|
return format_html('<a href="{}">{}</a>', url, count)
|
|
|
|
|
|
@admin.register(PurgedSnapshot)
|
|
class PurgedSnapshotAdmin(admin.ModelAdmin):
|
|
list_display = ("host_name", "kind", "dirname", "action", "reason", "triggered_by", "purged_at")
|
|
list_filter = ("action", "kind", "purged_at")
|
|
search_fields = ("host_name", "dirname", "path", "reason", "triggered_by")
|
|
list_select_related = ("host",)
|
|
readonly_fields = ("purged_at",)
|
|
date_hierarchy = "purged_at"
|
|
|
|
|
|
@admin.register(ScheduleConfig)
|
|
class ScheduleConfigAdmin(admin.ModelAdmin):
|
|
list_display = ("host", "cron_expr", "enabled", "prune", "last_status", "last_started_at", "updated_at")
|
|
list_filter = ("enabled", "prune", "last_status")
|
|
search_fields = ("host__host", "cron_expr")
|
|
list_select_related = ("host",)
|
|
|
|
|
|
def _admin_changelist_url(app_label: str, model_name: str, params: dict[str, object]) -> str:
|
|
base_url = reverse(f"admin:{app_label}_{model_name}_changelist")
|
|
return f"{base_url}?{urlencode(params)}"
|