(feature) Add run completion notifications

Add email and webhook notification targets with delivery tracking, and send
notifications when backup runs reach a terminal status.

Expose notification target management in the Django UI and keep delivery
failures recorded without failing the backup worker.
This commit is contained in:
2026-05-28 21:20:38 +02:00
parent 1f5c4e0756
commit 67ffd6101b
14 changed files with 819 additions and 4 deletions

View File

@@ -0,0 +1,168 @@
from __future__ import annotations
import json
import urllib.error
import urllib.request
from dataclasses import dataclass
from typing import Any
from django.conf import settings
from django.core.mail import send_mail
from django.utils import timezone
from .models import BackupRun, NotificationDelivery, NotificationTarget
TERMINAL_RUN_STATUSES = {
BackupRun.Status.SUCCESS,
BackupRun.Status.WARNING,
BackupRun.Status.FAILED,
BackupRun.Status.CANCELLED,
}
@dataclass(frozen=True)
class DeliveryResult:
target: NotificationTarget
delivery: NotificationDelivery
sent: bool
def notify_backup_run_completed(run: BackupRun) -> list[DeliveryResult]:
if run.status not in TERMINAL_RUN_STATUSES:
return []
targets = [target for target in NotificationTarget.objects.filter(enabled=True) if _target_wants_status(target, run.status)]
return [_notify_target(target=target, run=run) for target in targets]
def _target_wants_status(target: NotificationTarget, status: str) -> bool:
statuses = target.statuses
if not isinstance(statuses, list):
return False
return status in {str(item) for item in statuses}
def _notify_target(*, target: NotificationTarget, run: BackupRun) -> DeliveryResult:
payload = _run_payload(run)
delivery, created = NotificationDelivery.objects.get_or_create(
target=target,
run=run,
defaults={
"status": NotificationDelivery.Status.SKIPPED,
"payload": payload,
},
)
if not created:
return DeliveryResult(target=target, delivery=delivery, sent=False)
try:
if target.channel == NotificationTarget.Channel.EMAIL:
_send_email(target=target, run=run, payload=payload)
elif target.channel == NotificationTarget.Channel.WEBHOOK:
_send_webhook(target=target, payload=payload)
else:
raise ValueError(f"Unsupported notification channel: {target.channel}")
except Exception as exc:
delivery.status = NotificationDelivery.Status.FAILED
delivery.error = str(exc)
delivery.save(update_fields=["status", "error"])
target.last_status = NotificationDelivery.Status.FAILED
target.last_error = str(exc)
target.save(update_fields=["last_status", "last_error", "updated_at"])
return DeliveryResult(target=target, delivery=delivery, sent=False)
delivery.status = NotificationDelivery.Status.SENT
delivery.save(update_fields=["status"])
target.last_status = NotificationDelivery.Status.SENT
target.last_error = ""
target.last_sent_at = timezone.now()
target.save(update_fields=["last_status", "last_error", "last_sent_at", "updated_at"])
return DeliveryResult(target=target, delivery=delivery, sent=True)
def _send_email(*, target: NotificationTarget, run: BackupRun, payload: dict[str, Any]) -> None:
recipients = [line.strip() for line in target.email_to.replace(",", "\n").splitlines() if line.strip()]
if not recipients:
raise ValueError("Email notification target has no recipients.")
subject = f"pobsync {run.status}: {run.host.host} run {run.id}"
message = _email_message(payload)
from_email = getattr(settings, "DEFAULT_FROM_EMAIL", "") or "pobsync@localhost"
sent = send_mail(subject, message, from_email, recipients, fail_silently=False)
if sent == 0:
raise ValueError("Django email backend reported zero sent messages.")
def _send_webhook(*, target: NotificationTarget, payload: dict[str, Any]) -> None:
if not target.webhook_url:
raise ValueError("Webhook notification target has no URL.")
headers = {"Content-Type": "application/json", **_string_headers(target.webhook_headers)}
request = urllib.request.Request(
target.webhook_url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
if response.status >= 400:
raise ValueError(f"Webhook returned HTTP {response.status}.")
except urllib.error.HTTPError as exc:
raise ValueError(f"Webhook returned HTTP {exc.code}.") from exc
def _string_headers(headers: object) -> dict[str, str]:
if not isinstance(headers, dict):
return {}
return {str(key): str(value) for key, value in headers.items() if str(key).strip()}
def _run_payload(run: BackupRun) -> dict[str, Any]:
result = run.result if isinstance(run.result, dict) else {}
failure = result.get("failure") if isinstance(result.get("failure"), dict) else {}
prune = result.get("prune") if isinstance(result.get("prune"), dict) else {}
return {
"event": "backup_run.completed",
"run": {
"id": run.id,
"host": run.host.host,
"type": run.run_type,
"status": run.status,
"started_at": run.started_at.isoformat() if run.started_at else None,
"ended_at": run.ended_at.isoformat() if run.ended_at else None,
"snapshot": run.snapshot_path,
"rsync_exit_code": run.rsync_exit_code,
},
"failure": {
"category": failure.get("category"),
"message": failure.get("message") or result.get("error"),
"hint": failure.get("hint"),
},
"prune": {
"ok": prune.get("ok") if prune else None,
"error": prune.get("error") if prune else "",
},
}
def _email_message(payload: dict[str, Any]) -> str:
run = payload["run"]
lines = [
f"Host: {run['host']}",
f"Run: {run['id']}",
f"Type: {run['type']}",
f"Status: {run['status']}",
f"Started: {run['started_at'] or '-'}",
f"Ended: {run['ended_at'] or '-'}",
f"Snapshot: {run['snapshot'] or '-'}",
f"Rsync exit code: {run['rsync_exit_code'] if run['rsync_exit_code'] is not None else '-'}",
]
failure = payload.get("failure") if isinstance(payload.get("failure"), dict) else {}
if failure.get("message"):
lines.extend(["", f"Failure: {failure['message']}"])
prune = payload.get("prune") if isinstance(payload.get("prune"), dict) else {}
if prune.get("error"):
lines.extend(["", f"Retention: {prune['error']}"])
return "\n".join(lines)