(bugfix) Reconcile failed dry-runs from rsync terminal logs
Classify rsync failures in run results so transport issues such as exit 255 and broken pipes show clearer diagnostic hints. Teach the worker to reconcile running dry-runs when their log already contains a terminal rsync error, and to fail stale dry-runs after their timeout window. This prevents failed rsync processes from leaving runs stuck in the running state indefinitely.
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
|
||||
from django.db import transaction
|
||||
from django.utils import timezone
|
||||
|
||||
from pobsync.commands.run_scheduled import dry_run_log_path, run_scheduled
|
||||
from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled
|
||||
from pobsync_backend.config_source import DjangoConfigSource
|
||||
from pobsync_backend.models import BackupRun, HostConfig
|
||||
from pobsync_backend.retention import run_sql_retention_apply
|
||||
@@ -154,6 +155,14 @@ def claim_next_queued_run() -> BackupRun | None:
|
||||
return run
|
||||
|
||||
|
||||
def reconcile_running_runs(*, grace_seconds: int = 300) -> int:
|
||||
reconciled = 0
|
||||
for run in BackupRun.objects.select_related("host").filter(status=BackupRun.Status.RUNNING).order_by("started_at", "id"):
|
||||
if _reconcile_running_run(run=run, grace_seconds=grace_seconds):
|
||||
reconciled += 1
|
||||
return reconciled
|
||||
|
||||
|
||||
def requested_options(run: BackupRun) -> dict[str, object]:
|
||||
requested = run.result.get("requested") if isinstance(run.result, dict) else None
|
||||
if not isinstance(requested, dict):
|
||||
@@ -174,3 +183,83 @@ def _running_result(*, run: BackupRun, dry_run: bool) -> dict[str, object]:
|
||||
|
||||
def _run_cancel_requested(run_id: int) -> bool:
|
||||
return BackupRun.objects.filter(id=run_id, status=BackupRun.Status.CANCELLED).exists()
|
||||
|
||||
|
||||
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int) -> bool:
|
||||
result = run.result if isinstance(run.result, dict) else {}
|
||||
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
|
||||
if not requested.get("dry_run"):
|
||||
return False
|
||||
|
||||
log_path = _execution_log_path(result)
|
||||
log_tail = _read_log_tail(log_path) if log_path is not None else []
|
||||
terminal_log = _terminal_rsync_log(log_tail)
|
||||
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
|
||||
if not terminal_log and not timed_out:
|
||||
return False
|
||||
|
||||
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out else 255)
|
||||
failure = classify_rsync_failure(exit_code, log_tail)
|
||||
result.update(
|
||||
{
|
||||
"ok": False,
|
||||
"dry_run": True,
|
||||
"host": run.host.host,
|
||||
"base": result.get("base"),
|
||||
"log": str(log_path) if log_path else "",
|
||||
"failure": failure,
|
||||
"rsync": {
|
||||
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
|
||||
"exit_code": exit_code,
|
||||
"log_tail": log_tail,
|
||||
},
|
||||
}
|
||||
)
|
||||
run.status = BackupRun.Status.FAILED
|
||||
run.ended_at = timezone.now()
|
||||
run.rsync_exit_code = exit_code
|
||||
run.result = result
|
||||
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
|
||||
return True
|
||||
|
||||
|
||||
def _execution_log_path(result: dict[str, object]) -> Path | None:
|
||||
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
||||
log = execution.get("log") or result.get("log")
|
||||
if not isinstance(log, str) or not log:
|
||||
return None
|
||||
return Path(log)
|
||||
|
||||
|
||||
def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
|
||||
if log_path is None:
|
||||
return []
|
||||
try:
|
||||
return log_path.read_text(encoding="utf-8", errors="replace").splitlines()[-max_lines:]
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
|
||||
def _terminal_rsync_log(log_tail: list[str]) -> bool:
|
||||
return any(line.startswith("rsync error:") for line in log_tail)
|
||||
|
||||
|
||||
def _exit_code_from_log(log_tail: list[str]) -> int | None:
|
||||
for line in reversed(log_tail):
|
||||
if "code 255" in line:
|
||||
return 255
|
||||
if "code 124" in line:
|
||||
return 124
|
||||
if "code 12" in line:
|
||||
return 12
|
||||
return None
|
||||
|
||||
|
||||
def _running_dry_run_timed_out(*, run: BackupRun, grace_seconds: int) -> bool:
|
||||
if run.started_at is None:
|
||||
return False
|
||||
result = run.result if isinstance(run.result, dict) else {}
|
||||
timeout_seconds = result.get("timeout_seconds")
|
||||
if not isinstance(timeout_seconds, int) or timeout_seconds <= 0:
|
||||
timeout_seconds = DEFAULT_DRY_RUN_TIMEOUT_SECONDS
|
||||
return timezone.now() >= run.started_at + timedelta(seconds=timeout_seconds + grace_seconds)
|
||||
|
||||
@@ -8,7 +8,7 @@ from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from pobsync.paths import PobsyncPaths
|
||||
from pobsync_backend.backup_runner import claim_next_queued_run, execute_backup_run, requested_options
|
||||
from pobsync_backend.backup_runner import claim_next_queued_run, execute_backup_run, reconcile_running_runs, requested_options
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
@@ -33,9 +33,10 @@ class Command(BaseCommand):
|
||||
time.sleep(max(1, int(options["interval"])))
|
||||
|
||||
def _run_once(self, *, prefix: Path) -> int:
|
||||
reconciled = reconcile_running_runs()
|
||||
run = claim_next_queued_run()
|
||||
if run is None:
|
||||
return 0
|
||||
return reconciled
|
||||
|
||||
options = requested_options(run)
|
||||
try:
|
||||
@@ -49,4 +50,4 @@ class Command(BaseCommand):
|
||||
)
|
||||
except Exception as exc:
|
||||
self.stderr.write(f"{run.host.host}: {type(exc).__name__}: {exc}")
|
||||
return 1
|
||||
return reconciled + 1
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.test import TestCase
|
||||
from django.utils import timezone
|
||||
|
||||
from pobsync.util import write_yaml_atomic
|
||||
from pobsync_backend.backup_runner import queue_backup_run
|
||||
from pobsync_backend.backup_runner import queue_backup_run, reconcile_running_runs
|
||||
from pobsync_backend.management.commands.run_pobsync_worker import Command
|
||||
from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord
|
||||
|
||||
@@ -133,3 +135,43 @@ class BackupWorkerTests(TestCase):
|
||||
count = Command()._run_once(prefix=Path("/opt/pobsync"))
|
||||
|
||||
self.assertEqual(count, 0)
|
||||
|
||||
def test_worker_reconciles_running_dry_run_with_terminal_broken_pipe_log(self) -> None:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host, dry_run=True)
|
||||
log_path = Path("/tmp/pobsync-dryrun/web-01/run-test-broken-pipe.log")
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
log_path.write_text(
|
||||
"rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n"
|
||||
"rsync: [generator] write error: Broken pipe (32)\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
run.status = BackupRun.Status.RUNNING
|
||||
run.started_at = timezone.now()
|
||||
run.result["execution"] = {"log": str(log_path)}
|
||||
run.save(update_fields=["status", "started_at", "result"])
|
||||
|
||||
reconciled = reconcile_running_runs()
|
||||
|
||||
self.assertEqual(reconciled, 1)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.FAILED)
|
||||
self.assertEqual(run.rsync_exit_code, 255)
|
||||
self.assertEqual(run.result["failure"]["category"], "transport")
|
||||
self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"]))
|
||||
|
||||
def test_worker_reconciles_stale_running_dry_run_after_timeout(self) -> None:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host, dry_run=True)
|
||||
run.status = BackupRun.Status.RUNNING
|
||||
run.started_at = timezone.now() - timedelta(seconds=1300)
|
||||
run.result["timeout_seconds"] = 900
|
||||
run.save(update_fields=["status", "started_at", "result"])
|
||||
|
||||
reconciled = reconcile_running_runs(grace_seconds=300)
|
||||
|
||||
self.assertEqual(reconciled, 1)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.FAILED)
|
||||
self.assertEqual(run.rsync_exit_code, 124)
|
||||
self.assertEqual(run.result["failure"]["category"], "timeout")
|
||||
|
||||
@@ -66,6 +66,30 @@ class RunScheduledConfigSourceTests(SimpleTestCase):
|
||||
self.assertFalse(result["ok"])
|
||||
self.assertEqual(result["rsync"]["exit_code"], 12)
|
||||
self.assertEqual(result["rsync"]["log_tail"], ["Permission denied (publickey).", "rsync error"])
|
||||
self.assertEqual(result["failure"]["category"], "permissions")
|
||||
|
||||
def test_failed_dry_run_classifies_broken_pipe(self) -> None:
|
||||
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
log_path.write_text(
|
||||
"rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n"
|
||||
"rsync: [generator] write error: Broken pipe (32)\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return RsyncResult(exit_code=255, command=command)
|
||||
|
||||
with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync):
|
||||
result = run_scheduled(
|
||||
prefix=Path("/missing-prefix"),
|
||||
host="web-01",
|
||||
dry_run=True,
|
||||
config_source=FakeConfigSource(),
|
||||
)
|
||||
|
||||
self.assertFalse(result["ok"])
|
||||
self.assertEqual(result["rsync"]["exit_code"], 255)
|
||||
self.assertEqual(result["failure"]["category"], "transport")
|
||||
self.assertIn("broken pipe", result["failure"]["hint"].lower())
|
||||
|
||||
def test_dry_run_clears_previous_log_before_running(self) -> None:
|
||||
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
|
||||
|
||||
Reference in New Issue
Block a user