8 Commits

Author SHA1 Message Date
fdf401a0be Merge pull request '(refactor) Unify run progress panels' (#57) from issue-52-live-normal-runs into master
Reviewed-on: #57
2026-05-23 00:48:22 +02:00
3b77f2e5d0 (refactor) Unify run progress panels
Use a shared Run Progress presentation for dry-runs and normal backup
runs so live run feedback is consistent across run types.

Keep mode-specific metrics while aligning status, mode, log, and warning
layout.

Refs #52
2026-05-23 00:46:52 +02:00
df9ec5b04c Merge pull request 'issue-54-worker-rsync-state' (#56) from issue-54-worker-rsync-state into master
Reviewed-on: #56
2026-05-23 00:39:47 +02:00
5788f53854 (bugfix) Keep rsync runner callback optional
Only pass the process_started hook when live run state tracking is active,
so existing rsync call sites and tests without that hook remain compatible.

Refs #54
2026-05-23 00:31:24 +02:00
28da9c4096 (bugfix) Track rsync process state for running backups
Record rsync process pid and execution phase while normal backup runs are
active so the worker can reconcile stale running rows when rsync has
already disappeared.

Keep finalizing runs out of the missing-process path to avoid marking
slow post-rsync stats collection as a failed transfer.

Closes #54
2026-05-23 00:26:22 +02:00
6eb1b4add3 (bugfix) Reconcile real rsync failures from worker logs
Record live rsync log paths for normal backup runs so the worker can
recover stale running state after terminal rsync errors.

Treat rsync vanished-file exit code 24 as a warning and keep the
completed snapshot instead of failing the run into incomplete state.

Closes #54
2026-05-23 00:23:14 +02:00
8633cbea26 Merge pull request '(bugfix) Quote remote preflight shell commands' (#46) from issue-45-preflight-shell-quoting into master
Reviewed-on: #46
2026-05-21 15:45:46 +02:00
3fb8209aef (bugfix) Quote remote preflight shell commands
Pass remote rsync and source-root preflight checks as a single quoted
shell command to SSH so the remote shell evaluates command -v and test
expressions reliably.

Refs #45
2026-05-21 15:44:46 +02:00
11 changed files with 704 additions and 23 deletions

View File

@@ -23,6 +23,7 @@ from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_at
DEFAULT_DRY_RUN_TIMEOUT_SECONDS = 900
RSYNC_PARTIAL_VANISHED_EXIT_CODE = 24
def dry_run_log_path(host: str, run_id: int | None = None) -> Path:
@@ -72,6 +73,24 @@ def classify_rsync_failure(exit_code: int | None, log_tail: list[str]) -> dict[s
}
def classify_rsync_warning(exit_code: int | None, log_tail: list[str]) -> dict[str, str] | None:
joined_tail = "\n".join(log_tail).lower()
if exit_code == RSYNC_PARTIAL_VANISHED_EXIT_CODE:
return {
"category": "vanished",
"message": "Some source files vanished during rsync.",
"hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.",
}
if exit_code in (None, RSYNC_PARTIAL_VANISHED_EXIT_CODE) and (
"file has vanished" in joined_tail or "vanished before it could be transferred" in joined_tail
):
return {
"category": "vanished",
"message": "Some source files vanished during rsync.",
"hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.",
}
return None
def _collect_run_stats(
*,
log_path: Path,
@@ -158,6 +177,7 @@ def run_scheduled(
run_id: int | None = None,
cancel_check: Callable[[], bool] | None = None,
verbose_output: bool = False,
state_callback: Callable[[dict[str, Any]], None] | None = None,
) -> dict[str, Any]:
host = sanitize_host(host)
@@ -322,14 +342,59 @@ def run_scheduled(
log_path.touch(exist_ok=True)
write_yaml_atomic(meta_path, meta)
if state_callback is not None:
state_callback(
{
"status": "running",
"phase": "preparing",
"snapshot": str(incomplete_dir),
"log": str(log_path),
"rsync": {"command": cmd, "exit_code": None},
}
)
result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check)
def process_started(pid: int, pgid: int) -> None:
if state_callback is None:
return
state_callback(
{
"status": "running",
"phase": "rsync",
"snapshot": str(incomplete_dir),
"log": str(log_path),
"rsync": {"command": cmd, "exit_code": None, "pid": pid, "pgid": pgid},
}
)
run_rsync_kwargs: dict[str, Any] = {
"log_path": log_path,
"timeout_seconds": timeout_seconds,
"cancel_check": cancel_check,
}
if state_callback is not None:
run_rsync_kwargs["process_started"] = process_started
result = run_rsync(cmd, **run_rsync_kwargs)
log_tail = _read_log_tail(log_path)
warning = classify_rsync_warning(result.exit_code, log_tail)
successful_or_warning = result.exit_code == 0 or warning is not None
if state_callback is not None:
state_callback(
{
"status": "running",
"phase": "finalizing",
"snapshot": str(incomplete_dir),
"log": str(log_path),
"rsync": {"command": cmd, "exit_code": result.exit_code, "log_tail": log_tail},
}
)
end_ts = utc_now()
meta["ended_at"] = format_iso_z(end_ts)
meta["duration_seconds"] = int((end_ts - ts).total_seconds())
meta["rsync"]["exit_code"] = result.exit_code
meta["status"] = "cancelled" if result.cancelled else ("success" if result.exit_code == 0 else "failed")
meta["status"] = "cancelled" if result.cancelled else ("warning" if warning else ("success" if result.exit_code == 0 else "failed"))
if warning is not None:
meta["warning"] = warning
meta["stats"] = _collect_run_stats(
log_path=log_path,
backup_root=Path(backup_root),
@@ -349,8 +414,7 @@ def run_scheduled(
"error": "rsync.log missing after execution",
}
if result.exit_code != 0:
log_tail = _read_log_tail(log_path)
if not successful_or_warning:
return {
"ok": False,
"dry_run": False,
@@ -404,7 +468,9 @@ def run_scheduled(
"snapshot": str(final_dir),
"base": str(base_dir) if base_dir else None,
"log": str(final_log_path),
"rsync": {"exit_code": result.exit_code},
"status": meta["status"],
"warning": warning,
"rsync": {"exit_code": result.exit_code, "log_tail": log_tail},
"verbose_output": bool(verbose_output),
"duration_seconds": meta["duration_seconds"],
"stats": meta["stats"],

View File

@@ -82,6 +82,7 @@ def run_rsync(
log_path: Path,
timeout_seconds: int,
cancel_check: Callable[[], bool] | None = None,
process_started: Callable[[int, int], None] | None = None,
) -> RsyncResult:
"""
Run rsync and always write stdout/stderr to log_path.
@@ -95,6 +96,8 @@ def run_rsync(
with log_path.open("ab") as f:
process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, start_new_session=True)
if process_started is not None:
process_started(process.pid, os.getpgid(process.pid))
started = time.monotonic()
while True:
exit_code = process.poll()

View File

@@ -8,7 +8,13 @@ from pathlib import Path
from django.db import transaction
from django.utils import timezone
from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled
from pobsync.commands.run_scheduled import (
DEFAULT_DRY_RUN_TIMEOUT_SECONDS,
classify_rsync_failure,
classify_rsync_warning,
dry_run_log_path,
run_scheduled,
)
from pobsync_backend.config_source import DjangoConfigSource
from pobsync_backend.models import BackupRun, HostConfig
from pobsync_backend.retention import run_sql_retention_apply
@@ -66,6 +72,7 @@ def execute_backup_run(
run_id=run.id,
cancel_check=lambda: _run_cancel_requested(run.id),
verbose_output=bool(dry_run or verbose_output),
state_callback=lambda state: _record_running_state(run.id, state),
)
except Exception as exc:
run.refresh_from_db()
@@ -83,6 +90,8 @@ def execute_backup_run(
run.refresh_from_db()
if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED:
run.status = BackupRun.Status.CANCELLED
elif result.get("status") == BackupRun.Status.WARNING:
run.status = BackupRun.Status.WARNING
else:
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
run.ended_at = timezone.now()
@@ -201,11 +210,98 @@ def _run_cancel_requested(run_id: int) -> bool:
return False
def _record_running_state(run_id: int, state: dict[str, object]) -> None:
try:
run = BackupRun.objects.only("id", "status", "result", "snapshot_path", "rsync_exit_code").get(id=run_id)
except BackupRun.DoesNotExist:
return
if run.status != BackupRun.Status.RUNNING:
return
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
incoming_rsync = state.get("rsync") if isinstance(state.get("rsync"), dict) else {}
log_path = state.get("log")
snapshot_path = state.get("snapshot")
phase = state.get("phase")
if isinstance(phase, str) and phase:
execution["phase"] = phase
if isinstance(log_path, str) and log_path:
execution["log"] = log_path
if isinstance(snapshot_path, str) and snapshot_path:
execution["snapshot"] = snapshot_path
run.snapshot_path = snapshot_path
if incoming_rsync:
result["rsync"] = {**rsync, **incoming_rsync}
exit_code = incoming_rsync.get("exit_code")
if isinstance(exit_code, int):
run.rsync_exit_code = exit_code
result["execution"] = {
**execution,
"worker_pid": os.getpid(),
"worker_host": socket.gethostname(),
"heartbeat_at": timezone.now().isoformat(),
}
run.result = result
run.save(update_fields=["snapshot_path", "rsync_exit_code", "result"])
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool:
result = run.result if isinstance(run.result, dict) else {}
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
log_path = _execution_log_path(result)
log_tail = _read_log_tail(log_path) if log_path is not None else []
terminal_log = _terminal_rsync_log(log_tail)
exit_code = _exit_code_from_log(log_tail)
stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds)
if not requested.get("dry_run"):
if terminal_log:
failure = classify_rsync_failure(exit_code or 255, log_tail)
result.update(
{
"ok": False,
"host": run.host.host,
"log": str(log_path) if log_path else "",
"failure": failure,
"rsync": {
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
"exit_code": exit_code or 255,
"log_tail": log_tail,
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.rsync_exit_code = exit_code or 255
run.result = result
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
return True
if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds):
result.update(
{
"ok": False,
"host": run.host.host,
"log": str(log_path) if log_path else "",
"failure": {
"category": "rsync_process",
"message": "The rsync process is no longer running while the backup is still marked running.",
"hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.",
},
"rsync": {
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
"exit_code": exit_code or 255,
"log_tail": log_tail,
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.rsync_exit_code = exit_code or 255
run.result = result
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
return True
if stale_worker:
result.update(
{
@@ -225,14 +321,11 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s
return True
return False
log_path = _execution_log_path(result)
log_tail = _read_log_tail(log_path) if log_path is not None else []
terminal_log = _terminal_rsync_log(log_tail)
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
if not terminal_log and not timed_out and not stale_worker:
return False
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255)
exit_code = exit_code or (124 if timed_out or stale_worker else 255)
failure = classify_rsync_failure(exit_code, log_tail)
if stale_worker and not terminal_log:
failure = {
@@ -305,6 +398,9 @@ def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
def _terminal_rsync_log(log_tail: list[str]) -> bool:
warning = classify_rsync_warning(_exit_code_from_log(log_tail), log_tail)
if warning is not None:
return False
return any(line.startswith("rsync error:") for line in log_tail)
@@ -312,6 +408,8 @@ def _exit_code_from_log(log_tail: list[str]) -> int | None:
for line in reversed(log_tail):
if "code 255" in line:
return 255
if "code 24" in line:
return 24
if "code 124" in line:
return 124
if "code 12" in line:
@@ -342,6 +440,33 @@ def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> b
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool:
if grace_seconds <= 0:
return False
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
if execution.get("phase") != "rsync":
return False
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
pid = rsync.get("pid")
if not isinstance(pid, int) or pid <= 0:
return False
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at
if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds):
return False
return not _process_exists(pid)
def _process_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _parse_iso_datetime(value: object):
if not isinstance(value, str) or not value:
return None

View File

@@ -97,9 +97,7 @@ def run_remote_preflight(host: HostConfig, *, timeout_seconds: int = 20) -> dict
*ssh_cmd,
"-oBatchMode=yes",
target,
"sh",
"-lc",
f"command -v {shlex.quote(rsync_binary)} >/dev/null",
_remote_shell_command(f"command -v {shlex.quote(rsync_binary)} >/dev/null"),
],
timeout_seconds=timeout_seconds,
),
@@ -109,9 +107,7 @@ def run_remote_preflight(host: HostConfig, *, timeout_seconds: int = 20) -> dict
*ssh_cmd,
"-oBatchMode=yes",
target,
"sh",
"-lc",
f"test -e {shlex.quote(source_root)} && test -r {shlex.quote(source_root)}",
_remote_shell_command(f"test -e {shlex.quote(source_root)} && test -r {shlex.quote(source_root)}"),
],
timeout_seconds=timeout_seconds,
),
@@ -129,6 +125,10 @@ def run_remote_preflight(host: HostConfig, *, timeout_seconds: int = 20) -> dict
return result
def _remote_shell_command(script: str) -> str:
return f"sh -lc {shlex.quote(script)}"
def effective_host_config_preview(host: HostConfig, global_config: GlobalConfig) -> dict[str, Any]:
config = build_effective_config(global_config_object_data(global_config), host_config_object_data(host))
credential = host.ssh_credential or global_config.default_ssh_credential

View File

@@ -559,6 +559,15 @@
gap: 8px;
margin-bottom: 14px;
}
.refresh-controls {
align-items: center;
display: flex;
gap: 14px;
justify-content: space-between;
}
.refresh-controls h2 {
margin-bottom: 4px;
}
.trend-bars {
display: grid;
gap: 5px;
@@ -837,6 +846,10 @@
.page-header .actions { justify-content: flex-start; }
.two-col,
.panel-grid { grid-template-columns: 1fr; }
.refresh-controls {
align-items: stretch;
display: grid;
}
.dashboard-priority-grid { grid-template-columns: 1fr; }
.host-control-grid { grid-template-columns: 1fr; }
.schedule-row { grid-template-columns: 1fr; }
@@ -922,8 +935,20 @@
</main>
<script>
(() => {
const updateRefreshControls = (region) => {
const toggle = document.querySelector(`[data-refresh-toggle][data-refresh-target="${region.id}"]`);
const state = document.querySelector(`[data-refresh-state="${region.id}"]`);
const paused = region.dataset.refreshPaused === "true";
const active = region.dataset.refreshActive === "true";
if (state) state.textContent = paused ? "paused" : (active ? "on" : "off");
if (toggle) {
toggle.textContent = paused ? "Resume refresh" : "Pause refresh";
toggle.disabled = !active && !paused;
}
};
const refreshRegion = async (region) => {
if (region.dataset.refreshActive !== "true" || document.hidden) return;
if (region.dataset.refreshActive !== "true" || region.dataset.refreshPaused === "true" || document.hidden) return;
try {
const response = await fetch(region.dataset.refreshUrl, {
credentials: "same-origin",
@@ -933,13 +958,26 @@
region.innerHTML = await response.text();
const refreshActive = response.headers.get("X-Pobsync-Refresh-Active");
if (refreshActive) region.dataset.refreshActive = refreshActive;
updateRefreshControls(region);
} catch (error) {
// Keep the current server-rendered content visible if a refresh fails.
}
};
document.addEventListener("click", (event) => {
const toggle = event.target.closest("[data-refresh-toggle]");
if (!toggle) return;
const region = document.getElementById(toggle.dataset.refreshTarget);
if (!region) return;
const paused = region.dataset.refreshPaused === "true";
region.dataset.refreshPaused = paused ? "false" : "true";
if (paused && region.dataset.refreshActive === "true") refreshRegion(region);
updateRefreshControls(region);
});
document.querySelectorAll("[data-refresh-url]").forEach((region) => {
const interval = Number.parseInt(region.dataset.refreshInterval || "5000", 10);
updateRefreshControls(region);
window.setInterval(() => refreshRegion(region), Number.isFinite(interval) ? interval : 5000);
});
})();

View File

@@ -59,9 +59,13 @@
{% if dry_run_summary %}
<section class="panel highlight {{ dry_run_summary.highlight_class }}">
<h2>Dry Run Summary</h2>
<section class="grid" aria-label="Dry run summary">
<h2>Run Progress</h2>
<section class="grid" aria-label="Run progress">
<div class="metric"><div class="label">Status</div><div class="value">{{ dry_run_summary.status }}</div></div>
<div class="metric">
<div class="label">Mode</div>
<div class="value">dry run</div>
</div>
<div class="metric"><div class="label">Files Seen</div><div class="value">{{ dry_run_summary.files_seen|default:"unknown" }}</div></div>
<div class="metric"><div class="label">Would Transfer</div><div class="value">{{ dry_run_summary.files_would_transfer|default:"unknown" }}</div></div>
<div class="metric"><div class="label">Transfer Estimate</div><div class="value">{{ dry_run_summary.transfer_estimate_bytes|filesizeformat }}</div></div>
@@ -96,6 +100,74 @@
</section>
{% endif %}
{% if live_progress %}
<section class="panel highlight running">
<h2>Run Progress</h2>
<section class="grid" aria-label="Run progress">
<div class="metric">
<div class="label">Status</div>
<div class="value">{{ run.status }}</div>
</div>
<div class="metric">
<div class="label">Mode</div>
<div class="value">backup</div>
</div>
<div class="metric">
<div class="label">Phase</div>
<div class="value">{{ live_progress.phase }}</div>
</div>
<div class="metric">
<div class="label">Rsync PID</div>
<div class="value">{{ live_progress.rsync_pid|default:"" }}</div>
</div>
<div class="metric">
<div class="label">Log Updated</div>
<div class="value">
{% if live_progress.log.exists %}
{{ live_progress.log.seconds_since_modified }}s ago
{% else %}
missing
{% endif %}
</div>
</div>
<div class="metric">
<div class="label">Log Size</div>
<div class="value">{{ live_progress.log.size_bytes|filesizeformat }}</div>
</div>
{% if live_progress.snapshot.exists %}
<div class="metric">
<div class="label">Data Files</div>
<div class="value">{% if live_progress.snapshot.scan_limited %}at least {% endif %}{{ live_progress.snapshot.files }}</div>
</div>
<div class="metric">
<div class="label">Data Size</div>
<div class="value">{% if live_progress.snapshot.scan_limited %}at least {% endif %}{{ live_progress.snapshot.apparent_size_bytes|filesizeformat }}</div>
</div>
{% endif %}
</section>
<div class="stack">
{% if live_progress.snapshot.path %}
<div><strong>Snapshot path:</strong> {{ live_progress.snapshot.path }}</div>
{% endif %}
{% if live_progress.snapshot.scan_limited %}
<div class="muted">Progress scan was capped to keep the UI responsive.</div>
{% endif %}
{% if live_progress.log.path %}
<div>
<strong>Log:</strong>
{% if live_progress.log.exists %}
<a href="{% url 'run_rsync_log' run.id %}">Open full rsync log</a>
{% else %}
<span class="muted">{{ live_progress.log.path }} (missing)</span>
{% endif %}
</div>
<div><strong>Log path:</strong> {{ live_progress.log.path }}</div>
{% endif %}
<div><strong>Warnings:</strong> none recorded</div>
</div>
</section>
{% endif %}
<div class="two-col">
<section class="panel">
<h2>Timing</h2>

View File

@@ -14,10 +14,22 @@
</section>
</header>
{% if can_auto_refresh %}
<section class="panel refresh-controls" aria-label="Live refresh controls">
<div>
<h2>Live Updates</h2>
<p class="muted">Auto-refresh is <strong data-refresh-state="run-live-region">on</strong> while this run is active.</p>
</div>
<button type="button" class="secondary" data-refresh-toggle data-refresh-target="run-live-region">Pause refresh</button>
</section>
{% endif %}
<div
id="run-live-region"
data-refresh-url="{% url 'run_detail_live' run.id %}"
data-refresh-interval="5000"
data-refresh-active="{{ can_auto_refresh|yesno:'true,false' }}"
data-refresh-paused="false"
aria-live="polite"
>
{% include "pobsync_backend/partials/run_detail_live.html" %}

View File

@@ -85,6 +85,37 @@ class BackupWorkerTests(TestCase):
self.assertEqual(SnapshotRecord.objects.count(), 1)
self.assertEqual(run.snapshot, SnapshotRecord.objects.get())
def test_worker_records_warning_status_from_completed_run(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "warning", "started_at": "2026-05-19T02:15:00Z"})
run = queue_backup_run(host=host)
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
run_scheduled.return_value = {
"ok": True,
"status": "warning",
"dry_run": False,
"host": host.host,
"snapshot": str(snapshot_dir),
"base": None,
"warning": {"category": "vanished"},
"rsync": {"exit_code": 24},
}
count = Command()._run_once(prefix=Path(tmp) / "home")
self.assertEqual(count, 1)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.WARNING)
self.assertEqual(run.rsync_exit_code, 24)
self.assertEqual(run.result["warning"]["category"], "vanished")
def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None:
with TemporaryDirectory() as tmp:
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
@@ -116,6 +147,44 @@ class BackupWorkerTests(TestCase):
run_scheduled.side_effect = fake_run_scheduled
Command()._run_once(prefix=Path(tmp) / "home")
def test_worker_records_real_run_log_path_while_running(self) -> None:
with TemporaryDirectory() as tmp:
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
snapshot_dir = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH"
log_path = snapshot_dir / "meta" / "rsync.log"
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
def fake_run_scheduled(**kwargs):
kwargs["state_callback"](
{
"status": "running",
"phase": "rsync",
"snapshot": str(snapshot_dir),
"log": str(log_path),
"rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234},
}
)
run.refresh_from_db()
self.assertEqual(run.snapshot_path, str(snapshot_dir))
self.assertEqual(run.result["execution"]["phase"], "rsync")
self.assertEqual(run.result["execution"]["log"], str(log_path))
self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir))
self.assertEqual(run.result["rsync"]["command"], ["rsync"])
self.assertEqual(run.result["rsync"]["pid"], 1234)
return {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": "",
"base": None,
"rsync": {"exit_code": 0},
}
run_scheduled.side_effect = fake_run_scheduled
Command()._run_once(prefix=Path(tmp) / "home")
def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
@@ -136,6 +205,97 @@ class BackupWorkerTests(TestCase):
self.assertEqual(run.result["failure"]["category"], "worker")
self.assertIn("heartbeat stopped", run.result["failure"]["message"])
def test_worker_reconciles_real_run_with_terminal_broken_pipe_log(self) -> None:
with TemporaryDirectory() as tmp:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text(
"rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n"
"rsync error: received SIGUSR1 (code 19) at main.c(1600) [receiver=3.4.1]\n"
"rsync: [generator] write error: Broken pipe (32)\n",
encoding="utf-8",
)
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now()
run.result["execution"] = {"log": str(log_path)}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs()
self.assertEqual(reconciled, 1)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.FAILED)
self.assertEqual(run.rsync_exit_code, 255)
self.assertEqual(run.result["failure"]["category"], "transport")
self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"]))
def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None:
with TemporaryDirectory() as tmp:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text("sending incremental file list\n", encoding="utf-8")
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now() - timedelta(minutes=10)
run.result["execution"] = {
"phase": "rsync",
"log": str(log_path),
"heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(),
}
run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60)
self.assertEqual(reconciled, 1)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.FAILED)
self.assertEqual(run.result["failure"]["category"], "rsync_process")
self.assertEqual(run.rsync_exit_code, 255)
def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now() - timedelta(minutes=10)
run.result["execution"] = {
"phase": "finalizing",
"heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(),
}
run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60)
self.assertEqual(reconciled, 0)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.RUNNING)
def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None:
with TemporaryDirectory() as tmp:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text(
"file has vanished: \"/var/lib/app/session\"\n"
"rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n",
encoding="utf-8",
)
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now()
run.result["execution"] = {"log": str(log_path)}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs()
self.assertEqual(reconciled, 0)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.RUNNING)
def test_worker_records_dry_run_log_path_while_running(self) -> None:
with TemporaryDirectory() as tmp:
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))

View File

@@ -256,6 +256,71 @@ class RunScheduledConfigSourceTests(SimpleTestCase):
self.assertIn("stats:", meta_text)
self.assertIn("files_total: 10", meta_text)
def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None:
states = []
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None, process_started=None):
self.assertEqual(len(states), 1)
self.assertEqual(states[0]["status"], "running")
self.assertEqual(states[0]["phase"], "preparing")
self.assertEqual(states[0]["log"], str(log_path))
self.assertEqual(states[0]["rsync"]["command"], command)
self.assertIsNotNone(process_started)
process_started(1234, 1234)
self.assertEqual(len(states), 2)
self.assertEqual(states[1]["phase"], "rsync")
self.assertEqual(states[1]["rsync"]["pid"], 1234)
self.assertEqual(states[1]["rsync"]["pgid"], 1234)
log_path.write_text("Number of files: 1\n", encoding="utf-8")
return RsyncResult(exit_code=0, command=command)
with TemporaryDirectory() as tmp:
with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync):
run_scheduled(
prefix=Path(tmp) / "home",
host="web-01",
dry_run=False,
config_source=FakeConfigSource(backup_root=str(Path(tmp) / "backups")),
state_callback=states.append,
)
self.assertEqual(len(states), 3)
self.assertIn("/.incomplete/", states[0]["snapshot"])
self.assertEqual(states[2]["phase"], "finalizing")
self.assertEqual(states[2]["rsync"]["exit_code"], 0)
def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None:
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
log_path.write_text(
"file has vanished: \"/var/lib/app/session\"\n"
"rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n",
encoding="utf-8",
)
data_dir = log_path.parent.parent / "data"
data_dir.mkdir(parents=True, exist_ok=True)
(data_dir / "payload.txt").write_text("payload", encoding="utf-8")
return RsyncResult(exit_code=24, command=command)
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync):
result = run_scheduled(
prefix=Path(tmp) / "home",
host="web-01",
dry_run=False,
config_source=FakeConfigSource(backup_root=str(backup_root)),
)
snapshot = Path(result["snapshot"])
self.assertTrue((snapshot / "data" / "payload.txt").exists())
self.assertTrue(result["ok"])
self.assertEqual(result["status"], "warning")
self.assertEqual(result["rsync"]["exit_code"], 24)
self.assertEqual(result["warning"]["category"], "vanished")
self.assertEqual(snapshot.parent.name, "scheduled")
self.assertIn("file has vanished", "\n".join(result["rsync"]["log_tail"]))
def test_dry_run_reports_cancelled_rsync(self) -> None:
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
self.assertTrue(cancel_check())

View File

@@ -1100,6 +1100,10 @@ class ViewTests(TestCase):
self.assertContains(response, "Remote rsync")
self.assertContains(response, "Remote source root")
self.assertEqual(run.call_count, 3)
commands = [call.kwargs["args"] if "args" in call.kwargs else call.args[0] for call in run.call_args_list]
self.assertEqual(commands[1][-1], "sh -lc 'command -v rsync >/dev/null'")
self.assertEqual(commands[2][-1], "sh -lc 'test -e / && test -r /'")
self.assertNotIn("sh", commands[2][commands[2].index("root@web-01.example.test") + 1 : -1])
host.refresh_from_db()
self.assertTrue(host.config["last_preflight"]["ok"])
self.assertEqual(host.config["last_preflight"]["target"], "root@web-01.example.test")
@@ -1509,7 +1513,8 @@ class ViewTests(TestCase):
self.assertContains(response, "--archive")
self.assertContains(response, "Rsync Log")
self.assertContains(response, "sending incremental file list")
self.assertContains(response, "Dry Run Summary")
self.assertContains(response, "Run Progress")
self.assertContains(response, "dry run")
self.assertContains(response, "Files Seen")
self.assertContains(response, "Would Transfer")
self.assertContains(response, "Transfer Estimate")
@@ -1559,7 +1564,8 @@ class ViewTests(TestCase):
response = self.client.get(reverse("run_detail", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Dry Run Summary")
self.assertContains(response, "Run Progress")
self.assertContains(response, "dry run")
self.assertContains(response, "failed")
self.assertContains(response, "Files Seen")
self.assertContains(response, "25")
@@ -1745,6 +1751,8 @@ class ViewTests(TestCase):
self.assertContains(response, f'data-refresh-url="{reverse("run_detail_live", args=[run.id])}"', html=False)
self.assertContains(response, 'data-refresh-interval="5000"', html=False)
self.assertContains(response, 'data-refresh-active="true"', html=False)
self.assertContains(response, "Live Updates")
self.assertContains(response, "Pause refresh")
def test_run_detail_live_returns_partial_for_active_run(self) -> None:
self.client.force_login(self.staff_user)
@@ -1763,6 +1771,45 @@ class ViewTests(TestCase):
self.assertContains(response, "sending incremental file list")
self.assertNotContains(response, "<html", html=False)
def test_run_detail_live_shows_progress_for_running_real_run(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with TemporaryDirectory() as tmp:
snapshot_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260523-010000Z__ABCDEFGH"
data_path = snapshot_path / "data"
log_path = snapshot_path / "meta" / "rsync.log"
data_path.mkdir(parents=True)
log_path.parent.mkdir(parents=True)
(data_path / "payload.txt").write_text("payload", encoding="utf-8")
log_path.write_text("sending incremental file list\npayload.txt\n", encoding="utf-8")
run = BackupRun.objects.create(
host=host,
status=BackupRun.Status.RUNNING,
snapshot_path=str(snapshot_path),
result={
"requested": {"dry_run": False},
"execution": {
"phase": "rsync",
"snapshot": str(snapshot_path),
"log": str(log_path),
"heartbeat_at": "2026-05-23T01:00:00+02:00",
},
"rsync": {"pid": 1234, "pgid": 1234, "command": ["rsync"]},
},
)
response = self.client.get(reverse("run_detail_live", args=[run.id]))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Run Progress")
self.assertContains(response, "backup")
self.assertContains(response, "rsync")
self.assertContains(response, "1234")
self.assertContains(response, "Data Files")
self.assertContains(response, "Open full rsync log")
self.assertContains(response, "payload.txt")
self.assertContains(response, "sending incremental file list")
def test_run_detail_live_stops_refresh_for_terminal_run(self) -> None:
self.client.force_login(self.staff_user)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import os
import json
import shlex
import shutil
@@ -711,6 +712,7 @@ def _run_detail_context(run: BackupRun) -> dict[str, object]:
"rsync_log_path": str(rsync_log_path) if rsync_log_path is not None else "",
"rsync_log_exists": bool(rsync_log_path and rsync_log_path.exists()),
"rsync_log_tail": rsync_log_tail,
"live_progress": _run_live_progress(run, rsync_log_path),
"dry_run_summary": _dry_run_summary(
result=result,
requested=requested,
@@ -1260,6 +1262,97 @@ def _run_rsync_log_tail(rsync_result: dict, log_path: Path | None, *, max_lines:
return []
def _run_live_progress(run: BackupRun, log_path: Path | None) -> dict[str, object]:
if run.status not in {BackupRun.Status.QUEUED, BackupRun.Status.RUNNING}:
return {}
result = run.result if isinstance(run.result, dict) else {}
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
if requested.get("dry_run") or result.get("dry_run"):
return {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
progress: dict[str, object] = {
"phase": execution.get("phase") or ("queued" if run.status == BackupRun.Status.QUEUED else "running"),
"worker_pid": execution.get("worker_pid"),
"rsync_pid": rsync.get("pid"),
"rsync_pgid": rsync.get("pgid"),
}
log_stats = _live_log_stats(log_path)
if log_stats:
progress["log"] = log_stats
snapshot_path = _run_progress_snapshot_path(run, execution)
if snapshot_path is not None:
progress["snapshot"] = {
"path": str(snapshot_path),
**_scan_snapshot_progress(snapshot_path / "data" if (snapshot_path / "data").exists() else snapshot_path),
}
return progress
def _run_progress_snapshot_path(run: BackupRun, execution: dict) -> Path | None:
snapshot = execution.get("snapshot")
if isinstance(snapshot, str) and snapshot:
return Path(snapshot)
if run.snapshot_path:
return Path(run.snapshot_path)
return None
def _live_log_stats(log_path: Path | None) -> dict[str, object]:
if log_path is None:
return {}
try:
stat = log_path.stat()
except OSError:
return {"path": str(log_path), "exists": False}
modified_at = timezone.datetime.fromtimestamp(stat.st_mtime, tz=timezone.get_current_timezone())
return {
"path": str(log_path),
"exists": True,
"size_bytes": stat.st_size,
"modified_at": modified_at,
"seconds_since_modified": max(0, int((timezone.now() - modified_at).total_seconds())),
}
def _scan_snapshot_progress(data_path: Path, *, max_entries: int = 20000) -> dict[str, object]:
progress: dict[str, object] = {
"data_path": str(data_path),
"exists": data_path.exists(),
"files": 0,
"directories": 0,
"apparent_size_bytes": 0,
"scan_limited": False,
}
if not data_path.exists():
return progress
entries_seen = 0
for root, dirnames, filenames in os.walk(data_path):
progress["directories"] = int(progress["directories"]) + len(dirnames)
entries_seen += len(dirnames)
for filename in filenames:
file_path = Path(root) / filename
try:
file_stat = file_path.lstat()
except OSError:
continue
progress["files"] = int(progress["files"]) + 1
progress["apparent_size_bytes"] = int(progress["apparent_size_bytes"]) + int(file_stat.st_size)
entries_seen += 1
if entries_seen >= max_entries:
progress["scan_limited"] = True
return progress
if entries_seen >= max_entries:
progress["scan_limited"] = True
return progress
return progress
def _dry_run_summary(
*,
result: dict,