diff --git a/src/pobsync/commands/run_scheduled.py b/src/pobsync/commands/run_scheduled.py index 79e5d7e..435e5ac 100644 --- a/src/pobsync/commands/run_scheduled.py +++ b/src/pobsync/commands/run_scheduled.py @@ -38,6 +38,39 @@ def _read_log_tail(log_path: Path, *, max_lines: int = 40) -> list[str]: return lines[-max_lines:] +def classify_rsync_failure(exit_code: int | None, log_tail: list[str]) -> dict[str, str]: + joined_tail = "\n".join(log_tail).lower() + if exit_code == 255 and "broken pipe" in joined_tail: + return { + "category": "transport", + "message": "Rsync transport closed unexpectedly.", + "hint": "The SSH/rsync stream ended with a broken pipe. Check remote rsync availability, remote shell output, excludes, and connection stability.", + } + if exit_code == 255: + return { + "category": "transport", + "message": "Rsync transport failed.", + "hint": "Exit 255 usually comes from SSH or remote rsync startup. Check SSH access, known_hosts, remote rsync, and remote shell output.", + } + if exit_code == 124: + return { + "category": "timeout", + "message": "Rsync timed out.", + "hint": "Increase the rsync timeout or narrow the backup scope with source root, includes, or excludes.", + } + if "permission denied" in joined_tail: + return { + "category": "permissions", + "message": "Rsync hit a permission error.", + "hint": "Check the SSH user, key, and permissions on the remote source.", + } + return { + "category": "rsync", + "message": "Rsync failed.", + "hint": "Check the rsync log tail for the underlying error.", + } + + def _host_backup_dirs(backup_root: str, host: str) -> HostBackupDirs: return HostBackupDirs(root=Path(backup_root) / host) @@ -187,7 +220,7 @@ def run_scheduled( ) log_tail = _read_log_tail(dryrun_log) - return { + response = { "ok": result.exit_code == 0, "dry_run": True, "host": host, @@ -202,6 +235,9 @@ def run_scheduled( "log_tail": log_tail, }, } + if result.exit_code != 0: + response["failure"] = classify_rsync_failure(result.exit_code, log_tail) + return response # ------------------------------------------------------------ # REAL RUN @@ -283,6 +319,7 @@ def run_scheduled( } if result.exit_code != 0: + log_tail = _read_log_tail(log_path) return { "ok": False, "dry_run": False, @@ -295,8 +332,9 @@ def run_scheduled( "rsync": { "exit_code": result.exit_code, "command": result.command, - "log_tail": _read_log_tail(log_path), + "log_tail": log_tail, }, + "failure": classify_rsync_failure(result.exit_code, log_tail), } final_dir = dirs.scheduled / snap_name diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py index f62e10e..8f4876b 100644 --- a/src/pobsync_backend/backup_runner.py +++ b/src/pobsync_backend/backup_runner.py @@ -1,11 +1,12 @@ from __future__ import annotations +from datetime import timedelta from pathlib import Path from django.db import transaction from django.utils import timezone -from pobsync.commands.run_scheduled import dry_run_log_path, run_scheduled +from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.retention import run_sql_retention_apply @@ -154,6 +155,14 @@ def claim_next_queued_run() -> BackupRun | None: return run +def reconcile_running_runs(*, grace_seconds: int = 300) -> int: + reconciled = 0 + for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"): + if _reconcile_running_run(run=run, grace_seconds=grace_seconds): + reconciled += 1 + return reconciled + + def requested_options(run: BackupRun) -> dict[str, object]: requested = run.result.get("requested") if isinstance(run.result, dict) else None if not isinstance(requested, dict): @@ -174,3 +183,83 @@ def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]: def _run_cancel_requested(run_id: int) -> bool: return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists() + + +def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool: + result = run.result if isinstance(run.result, dict) else {} + requested = result.get("requested") if isinstance(result.get("requested"), dict) else {} + if not requested.get("dry_run"): + return False + + log_path = _execution_log_path(result) + log_tail = _read_log_tail(log_path) if log_path is not None else [] + terminal_log = _terminal_rsync_log(log_tail) + timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds) + if not terminal_log and not timed_out: + return False + + exit_code = _exit_code_from_log(log_tail) or (124 if timed_out else 255) + failure = classify_rsync_failure(exit_code, log_tail) + result.update( + { + "ok": False, + "dry_run": True, + "host": run.host.host, + "base": result.get("base"), + "log": str(log_path) if log_path else "", + "failure": failure, + "rsync": { + **(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}), + "exit_code": exit_code, + "log_tail": log_tail, + }, + } + ) + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.rsync_exit_code = exit_code + run.result = result + run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"]) + return True + + +def _execution_log_path(result: dict[str, object]) -> Path | None: + execution = result.get("execution") if isinstance(result.get("execution"), dict) else {} + log = execution.get("log") or result.get("log") + if not isinstance(log, str) or not log: + return None + return Path(log) + + +def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]: + if log_path is None: + return [] + try: + return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:] + except OSError: + return [] + + +def _terminal_rsync_log(log_tail: list[str]) -> bool: + return any(line.startswith("rsync error:") for line in log_tail) + + +def _exit_code_from_log(log_tail: list[str]) -> int | None: + for line in reversed(log_tail): + if "code 255" in line: + return 255 + if "code 124" in line: + return 124 + if "code 12" in line: + return 12 + return None + + +def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool: + if run.started_at is None: + return False + result = run.result if isinstance(run.result, dict) else {} + timeout_seconds = result.get("timeout_seconds") + if not isinstance(timeout_seconds, int) or timeout_seconds <= 0: + timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS + return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds) diff --git a/src/pobsync_backend/management/commands/run_pobsync_worker.py b/src/pobsync_backend/management/commands/run_pobsync_worker.py index 848786c..303c951 100644 --- a/src/pobsync_backend/management/commands/run_pobsync_worker.py +++ b/src/pobsync_backend/management/commands/run_pobsync_worker.py @@ -8,7 +8,7 @@ from django.conf import settings from django.core.management.base import BaseCommand from pobsync.paths import PobsyncPaths -from pobsync_backend.backup_runner import claim_next_queued_run, execute_backup_run, requested_options +from pobsync_backend.backup_runner import claim_next_queued_run, execute_backup_run, reconcile_running_runs, requested_options class Command(BaseCommand): @@ -33,9 +33,10 @@ class Command(BaseCommand): time.sleep(max(1, int(options["interval"]))) def _run_once(self, *, prefix: Path) -> int: + reconciled = reconcile_running_runs() run = claim_next_queued_run() if run is None: - return 0 + return reconciled options = requested_options(run) try: @@ -49,4 +50,4 @@ class Command(BaseCommand): ) except Exception as exc: self.stderr.write(f"{run.host.host}: {type(exc).__name__}: {exc}") - return 1 + return reconciled + 1 diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py index 26739bb..f24d49c 100644 --- a/src/pobsync_backend/tests/test_backup_worker.py +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -1,13 +1,15 @@ from __future__ import annotations +from datetime import timedelta from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch from django.test import TestCase +from django.utils import timezone from pobsync.util import write_yaml_atomic -from pobsync_backend.backup_runner import queue_backup_run +from pobsync_backend.backup_runner import queue_backup_run, reconcile_running_runs from pobsync_backend.management.commands.run_pobsync_worker import Command from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord @@ -133,3 +135,43 @@ class BackupWorkerTests(TestCase): count = Command()._run_once(prefix=Path("/opt/pobsync")) self.assertEqual(count, 0) + + def test_worker_reconciles_running_dry_run_with_terminal_broken_pipe_log(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host, dry_run=True) + log_path = Path("/tmp/pobsync-dryrun/web-01/run-test-broken-pipe.log") + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" + "rsync: [generator] write error: Broken pipe (32)\n", + encoding="utf-8", + ) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() + run.result["execution"] = {"log": str(log_path)} + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs() + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.rsync_exit_code, 255) + self.assertEqual(run.result["failure"]["category"], "transport") + self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"])) + + def test_worker_reconciles_stale_running_dry_run_after_timeout(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + run = queue_backup_run(host=host, dry_run=True) + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() - timedelta(seconds=1300) + run.result["timeout_seconds"] = 900 + run.save(update_fields=["status", "started_at", "result"]) + + reconciled = reconcile_running_runs(grace_seconds=300) + + self.assertEqual(reconciled, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.FAILED) + self.assertEqual(run.rsync_exit_code, 124) + self.assertEqual(run.result["failure"]["category"], "timeout") diff --git a/src/pobsync_backend/tests/test_run_scheduled_config_source.py b/src/pobsync_backend/tests/test_run_scheduled_config_source.py index d486062..0d97e87 100644 --- a/src/pobsync_backend/tests/test_run_scheduled_config_source.py +++ b/src/pobsync_backend/tests/test_run_scheduled_config_source.py @@ -66,6 +66,30 @@ class RunScheduledConfigSourceTests(SimpleTestCase): self.assertFalse(result["ok"]) self.assertEqual(result["rsync"]["exit_code"], 12) self.assertEqual(result["rsync"]["log_tail"], ["Permission denied (publickey).", "rsync error"]) + self.assertEqual(result["failure"]["category"], "permissions") + + def test_failed_dry_run_classifies_broken_pipe(self) -> None: + def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None): + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text( + "rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n" + "rsync: [generator] write error: Broken pipe (32)\n", + encoding="utf-8", + ) + return RsyncResult(exit_code=255, command=command) + + with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync): + result = run_scheduled( + prefix=Path("/missing-prefix"), + host="web-01", + dry_run=True, + config_source=FakeConfigSource(), + ) + + self.assertFalse(result["ok"]) + self.assertEqual(result["rsync"]["exit_code"], 255) + self.assertEqual(result["failure"]["category"], "transport") + self.assertIn("broken pipe", result["failure"]["hint"].lower()) def test_dry_run_clears_previous_log_before_running(self) -> None: def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):