(feature) Add backup trend forecasting to the Django UI

Extend derived backup statistics with average daily new data and an
estimated days-until-full forecast based on recent successful real runs.

Show the new forecast metrics on the dashboard and add compact per-run
trend bars on the host detail page so new data and matched link-dest data
are easier to compare at a glance.

Keep the implementation migration-free by deriving everything from the
existing BackupRun result stats payload.
This commit is contained in:
2026-05-19 22:39:46 +02:00
parent fc22842fc4
commit e8169eae42
5 changed files with 103 additions and 1 deletions

View File

@@ -3,6 +3,8 @@ from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable
from django.utils import timezone
from pobsync.run_stats import filesystem_capacity
from .models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord
@@ -33,15 +35,18 @@ def collect_dashboard_stats(*, hosts: Iterable[HostConfig], global_config: Globa
savings_basis = total_literal + total_matched
capacity = _capacity_from_system(global_config) or _latest_capacity_from_runs(real_runs) or {}
available = _int_at(capacity, "available_bytes")
daily_literal = _average_daily_literal(real_runs)
return {
"runs_sampled": len(real_runs),
"avg_duration_seconds": _average(duration_values),
"avg_daily_literal_data_bytes": daily_literal,
"avg_literal_data_bytes": avg_literal,
"total_literal_data_bytes": total_literal,
"total_matched_data_bytes": total_matched,
"link_dest_savings_ratio": round(total_matched / savings_basis, 4) if savings_basis else None,
"estimated_runs_until_full": int(available / avg_literal) if available and avg_literal else None,
"estimated_days_until_full": int(available / daily_literal) if available and daily_literal else None,
"capacity": capacity,
}
@@ -57,12 +62,15 @@ def collect_host_stats(*, host: HostConfig, limit: int = 8) -> dict[str, Any]:
literal_values = [value for value in literal_values if value is not None]
matched_values = [_int_at(run, "rsync", "matched_data_bytes") for run in real_runs]
matched_values = [value for value in matched_values if value is not None]
max_literal = max(literal_values) if literal_values else 0
max_matched = max(matched_values) if matched_values else 0
return {
"runs": real_runs,
"runs": [_with_bar_percentages(run, max_literal=max_literal, max_matched=max_matched) for run in real_runs],
"latest_run": real_runs[0] if real_runs else {},
"latest_snapshot": latest_snapshot_stats,
"avg_literal_data_bytes": _average(literal_values),
"avg_daily_literal_data_bytes": _average_daily_literal(real_runs),
"total_literal_data_bytes": sum(literal_values),
"total_matched_data_bytes": sum(matched_values),
}
@@ -132,6 +140,41 @@ def _average(values: list[int]) -> int | None:
return int(sum(values) / len(values))
def _average_daily_literal(runs: list[dict[str, Any]]) -> int | None:
values = [_int_at(run, "rsync", "literal_data_bytes") for run in runs]
values = [value for value in values if value is not None]
if not values:
return None
timestamps = [run["started_at"] for run in runs if run.get("started_at") is not None]
if len(timestamps) < 2:
return _average(values)
oldest = min(timestamps)
newest = max(timestamps)
if timezone.is_naive(oldest):
oldest = timezone.make_aware(oldest)
if timezone.is_naive(newest):
newest = timezone.make_aware(newest)
span_days = max((newest - oldest).total_seconds() / 86400, 1)
return int(sum(values) / span_days)
def _with_bar_percentages(run: dict[str, Any], *, max_literal: int, max_matched: int) -> dict[str, Any]:
run = dict(run)
literal = _int_at(run, "rsync", "literal_data_bytes") or 0
matched = _int_at(run, "rsync", "matched_data_bytes") or 0
run["literal_percent"] = _percentage(literal, max_literal)
run["matched_percent"] = _percentage(matched, max_matched)
return run
def _percentage(value: int, maximum: int) -> int:
if maximum <= 0 or value <= 0:
return 0
return max(1, min(100, int(value / maximum * 100)))
def _dict_at(data: dict[str, Any], *keys: str) -> dict[str, Any]:
value: Any = data
for key in keys: