from __future__ import annotations import json import urllib.error import urllib.request from dataclasses import dataclass from typing import Any from django.conf import settings from django.core.mail import send_mail from django.utils import timezone from .models import BackupRun, NotificationDelivery, NotificationTarget TERMINAL_RUN_STATUSES = { BackupRun.Status.SUCCESS, BackupRun.Status.WARNING, BackupRun.Status.FAILED, BackupRun.Status.CANCELLED, } @dataclass(frozen=True) class DeliveryResult: target: NotificationTarget delivery: NotificationDelivery sent: bool def notify_backup_run_completed(run: BackupRun) -> list[DeliveryResult]: if run.status not in TERMINAL_RUN_STATUSES: return [] targets = [target for target in NotificationTarget.objects.filter(enabled=True) if _target_wants_status(target, run.status)] return [_notify_target(target=target, run=run) for target in targets] def _target_wants_status(target: NotificationTarget, status: str) -> bool: statuses = target.statuses if not isinstance(statuses, list): return False return status in {str(item) for item in statuses} def _notify_target(*, target: NotificationTarget, run: BackupRun) -> DeliveryResult: payload = _run_payload(run) delivery, created = NotificationDelivery.objects.get_or_create( target=target, run=run, defaults={ "status": NotificationDelivery.Status.SKIPPED, "payload": payload, }, ) if not created: return DeliveryResult(target=target, delivery=delivery, sent=False) try: if target.channel == NotificationTarget.Channel.EMAIL: _send_email(target=target, run=run, payload=payload) elif target.channel == NotificationTarget.Channel.WEBHOOK: _send_webhook(target=target, payload=payload) else: raise ValueError(f"Unsupported notification channel: {target.channel}") except Exception as exc: delivery.status = NotificationDelivery.Status.FAILED delivery.error = str(exc) delivery.save(update_fields=["status", "error"]) target.last_status = NotificationDelivery.Status.FAILED target.last_error = str(exc) target.save(update_fields=["last_status", "last_error", "updated_at"]) return DeliveryResult(target=target, delivery=delivery, sent=False) delivery.status = NotificationDelivery.Status.SENT delivery.save(update_fields=["status"]) target.last_status = NotificationDelivery.Status.SENT target.last_error = "" target.last_sent_at = timezone.now() target.save(update_fields=["last_status", "last_error", "last_sent_at", "updated_at"]) return DeliveryResult(target=target, delivery=delivery, sent=True) def _send_email(*, target: NotificationTarget, run: BackupRun, payload: dict[str, Any]) -> None: recipients = [line.strip() for line in target.email_to.replace(",", "\n").splitlines() if line.strip()] if not recipients: raise ValueError("Email notification target has no recipients.") subject = f"pobsync {run.status}: {run.host.host} run {run.id}" message = _email_message(payload) from_email = getattr(settings, "DEFAULT_FROM_EMAIL", "") or "pobsync@localhost" sent = send_mail(subject, message, from_email, recipients, fail_silently=False) if sent == 0: raise ValueError("Django email backend reported zero sent messages.") def _send_webhook(*, target: NotificationTarget, payload: dict[str, Any]) -> None: if not target.webhook_url: raise ValueError("Webhook notification target has no URL.") headers = {"Content-Type": "application/json", **_string_headers(target.webhook_headers)} request = urllib.request.Request( target.webhook_url, data=json.dumps(payload).encode("utf-8"), headers=headers, method="POST", ) try: with urllib.request.urlopen(request, timeout=10) as response: if response.status >= 400: raise ValueError(f"Webhook returned HTTP {response.status}.") except urllib.error.HTTPError as exc: raise ValueError(f"Webhook returned HTTP {exc.code}.") from exc def _string_headers(headers: object) -> dict[str, str]: if not isinstance(headers, dict): return {} return {str(key): str(value) for key, value in headers.items() if str(key).strip()} def _run_payload(run: BackupRun) -> dict[str, Any]: result = run.result if isinstance(run.result, dict) else {} failure = result.get("failure") if isinstance(result.get("failure"), dict) else {} prune = result.get("prune") if isinstance(result.get("prune"), dict) else {} return { "event": "backup_run.completed", "run": { "id": run.id, "host": run.host.host, "type": run.run_type, "status": run.status, "started_at": run.started_at.isoformat() if run.started_at else None, "ended_at": run.ended_at.isoformat() if run.ended_at else None, "snapshot": run.snapshot_path, "rsync_exit_code": run.rsync_exit_code, }, "failure": { "category": failure.get("category"), "message": failure.get("message") or result.get("error"), "hint": failure.get("hint"), }, "prune": { "ok": prune.get("ok") if prune else None, "error": prune.get("error") if prune else "", }, } def _email_message(payload: dict[str, Any]) -> str: run = payload["run"] lines = [ f"Host: {run['host']}", f"Run: {run['id']}", f"Type: {run['type']}", f"Status: {run['status']}", f"Started: {run['started_at'] or '-'}", f"Ended: {run['ended_at'] or '-'}", f"Snapshot: {run['snapshot'] or '-'}", f"Rsync exit code: {run['rsync_exit_code'] if run['rsync_exit_code'] is not None else '-'}", ] failure = payload.get("failure") if isinstance(payload.get("failure"), dict) else {} if failure.get("message"): lines.extend(["", f"Failure: {failure['message']}"]) prune = payload.get("prune") if isinstance(payload.get("prune"), dict) else {} if prune.get("error"): lines.extend(["", f"Retention: {prune['error']}"]) return "\n".join(lines)