from __future__ import annotations from pathlib import Path from typing import Any, Iterable from django.utils import timezone from pobsync.run_stats import filesystem_capacity from .models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord def collect_dashboard_stats(*, hosts: Iterable[HostConfig], global_config: GlobalConfig | None) -> dict[str, Any]: runs = list( BackupRun.objects.select_related("host", "snapshot") .filter(status=BackupRun.Status.SUCCESS) .order_by("-started_at", "-created_at")[:100] ) real_runs = [_run_summary(run) for run in runs if _is_real_run(run)] real_runs = [run for run in real_runs if run["has_stats"]] for host in hosts: host.stats_summary = collect_host_stats(host=host) literal_values = [_int_at(run, "rsync", "literal_data_bytes") for run in real_runs] literal_values = [value for value in literal_values if value is not None] matched_values = [_int_at(run, "rsync", "matched_data_bytes") for run in real_runs] matched_values = [value for value in matched_values if value is not None] duration_values = [_int_at(run, "duration_seconds") for run in real_runs] duration_values = [value for value in duration_values if value is not None] avg_literal = _average(literal_values) total_literal = sum(literal_values) total_matched = sum(matched_values) savings_basis = total_literal + total_matched capacity = _capacity_from_system(global_config) or _latest_capacity_from_runs(real_runs) or {} available = _int_at(capacity, "available_bytes") daily_literal = _average_daily_literal(real_runs) return { "runs_sampled": len(real_runs), "avg_duration_seconds": _average(duration_values), "avg_daily_literal_data_bytes": daily_literal, "avg_literal_data_bytes": avg_literal, "total_literal_data_bytes": total_literal, "total_matched_data_bytes": total_matched, "link_dest_savings_ratio": round(total_matched / savings_basis, 4) if savings_basis else None, "estimated_runs_until_full": int(available / avg_literal) if available and avg_literal else None, "estimated_days_until_full": int(available / daily_literal) if available and daily_literal else None, "capacity": capacity, } def collect_host_stats(*, host: HostConfig, limit: int = 8) -> dict[str, Any]: runs = list(host.runs.select_related("snapshot").filter(status=BackupRun.Status.SUCCESS).order_by("-started_at", "-created_at")[:50]) real_runs = [_run_summary(run) for run in runs if _is_real_run(run)] real_runs = [run for run in real_runs if run["has_stats"]][:limit] latest_snapshot = host.snapshots.order_by("-started_at", "-discovered_at", "-id").first() latest_snapshot_stats = _snapshot_summary(latest_snapshot) if latest_snapshot else {} literal_values = [_int_at(run, "rsync", "literal_data_bytes") for run in real_runs] literal_values = [value for value in literal_values if value is not None] matched_values = [_int_at(run, "rsync", "matched_data_bytes") for run in real_runs] matched_values = [value for value in matched_values if value is not None] max_literal = max(literal_values) if literal_values else 0 max_matched = max(matched_values) if matched_values else 0 return { "runs": [_with_bar_percentages(run, max_literal=max_literal, max_matched=max_matched) for run in real_runs], "latest_run": real_runs[0] if real_runs else {}, "latest_snapshot": latest_snapshot_stats, "avg_literal_data_bytes": _average(literal_values), "avg_daily_literal_data_bytes": _average_daily_literal(real_runs), "total_literal_data_bytes": sum(literal_values), "total_matched_data_bytes": sum(matched_values), } def _run_summary(run: BackupRun) -> dict[str, Any]: result = run.result if isinstance(run.result, dict) else {} stats = result.get("stats") if isinstance(result.get("stats"), dict) else {} return { "id": run.id, "host": run.host.host, "run_type": run.run_type, "started_at": run.started_at, "ended_at": run.ended_at, "snapshot": run.snapshot, "snapshot_path": run.snapshot_path, "has_stats": bool(stats), "duration_seconds": _int_at(stats, "duration_seconds"), "rsync": stats.get("rsync") if isinstance(stats.get("rsync"), dict) else {}, "storage": stats.get("storage") if isinstance(stats.get("storage"), dict) else {}, } def _snapshot_summary(snapshot: SnapshotRecord | None) -> dict[str, Any]: if snapshot is None: return {} metadata = snapshot.metadata if isinstance(snapshot.metadata, dict) else {} stats = metadata.get("stats") if isinstance(metadata.get("stats"), dict) else {} storage = stats.get("storage") if isinstance(stats.get("storage"), dict) else {} snapshot_storage = storage.get("snapshot") if isinstance(storage.get("snapshot"), dict) else {} return { "id": snapshot.id, "dirname": snapshot.dirname, "kind": snapshot.kind, "status": snapshot.status, "started_at": snapshot.started_at, "apparent_size_bytes": _int_at(snapshot_storage, "apparent_size_bytes"), "allocated_size_bytes": _int_at(snapshot_storage, "allocated_size_bytes"), "hardlinked_files": _int_at(snapshot_storage, "hardlinked_files"), } def _is_real_run(run: BackupRun) -> bool: result = run.result if isinstance(run.result, dict) else {} if result.get("dry_run") is True: return False requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} return requested.get("dry_run") is not True def _capacity_from_system(global_config: GlobalConfig | None) -> dict[str, Any]: if global_config is None or not global_config.backup_root: return {} return filesystem_capacity(Path(global_config.backup_root)) def _latest_capacity_from_runs(runs: list[dict[str, Any]]) -> dict[str, Any]: for run in runs: capacity = _dict_at(run, "storage", "capacity") if capacity: return capacity return {} def _average(values: list[int]) -> int | None: if not values: return None return int(sum(values) / len(values)) def _average_daily_literal(runs: list[dict[str, Any]]) -> int | None: values = [_int_at(run, "rsync", "literal_data_bytes") for run in runs] values = [value for value in values if value is not None] if not values: return None timestamps = [run["started_at"] for run in runs if run.get("started_at") is not None] if len(timestamps) < 2: return _average(values) oldest = min(timestamps) newest = max(timestamps) if timezone.is_naive(oldest): oldest = timezone.make_aware(oldest) if timezone.is_naive(newest): newest = timezone.make_aware(newest) span_days = max((newest - oldest).total_seconds() / 86400, 1) return int(sum(values) / span_days) def _with_bar_percentages(run: dict[str, Any], *, max_literal: int, max_matched: int) -> dict[str, Any]: run = dict(run) literal = _int_at(run, "rsync", "literal_data_bytes") or 0 matched = _int_at(run, "rsync", "matched_data_bytes") or 0 run["literal_percent"] = _percentage(literal, max_literal) run["matched_percent"] = _percentage(matched, max_matched) return run def _percentage(value: int, maximum: int) -> int: if maximum <= 0 or value <= 0: return 0 return max(1, min(100, int(value / maximum * 100))) def _dict_at(data: dict[str, Any], *keys: str) -> dict[str, Any]: value: Any = data for key in keys: if not isinstance(value, dict): return {} value = value.get(key) return value if isinstance(value, dict) else {} def _int_at(data: dict[str, Any], *keys: str) -> int | None: value: Any = data for key in keys: if not isinstance(value, dict): return None value = value.get(key) if isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, float): return int(value) return None