Merge pull request 'issue-54-worker-rsync-state' (#56) from issue-54-worker-rsync-state into master
Reviewed-on: #56
This commit was merged in pull request #56.
This commit is contained in:
@@ -23,6 +23,7 @@ from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_at
|
||||
|
||||
|
||||
DEFAULT_DRY_RUN_TIMEOUT_SECONDS = 900
|
||||
RSYNC_PARTIAL_VANISHED_EXIT_CODE = 24
|
||||
|
||||
|
||||
def dry_run_log_path(host: str, run_id: int | None = None) -> Path:
|
||||
@@ -72,6 +73,24 @@ def classify_rsync_failure(exit_code: int | None, log_tail: list[str]) -> dict[s
|
||||
}
|
||||
|
||||
|
||||
def classify_rsync_warning(exit_code: int | None, log_tail: list[str]) -> dict[str, str] | None:
|
||||
joined_tail = "\n".join(log_tail).lower()
|
||||
if exit_code == RSYNC_PARTIAL_VANISHED_EXIT_CODE:
|
||||
return {
|
||||
"category": "vanished",
|
||||
"message": "Some source files vanished during rsync.",
|
||||
"hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.",
|
||||
}
|
||||
if exit_code in (None, RSYNC_PARTIAL_VANISHED_EXIT_CODE) and (
|
||||
"file has vanished" in joined_tail or "vanished before it could be transferred" in joined_tail
|
||||
):
|
||||
return {
|
||||
"category": "vanished",
|
||||
"message": "Some source files vanished during rsync.",
|
||||
"hint": "This is common on live systems. The snapshot was kept, but review the rsync log if this happens often.",
|
||||
}
|
||||
return None
|
||||
|
||||
def _collect_run_stats(
|
||||
*,
|
||||
log_path: Path,
|
||||
@@ -158,6 +177,7 @@ def run_scheduled(
|
||||
run_id: int | None = None,
|
||||
cancel_check: Callable[[], bool] | None = None,
|
||||
verbose_output: bool = False,
|
||||
state_callback: Callable[[dict[str, Any]], None] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
|
||||
host = sanitize_host(host)
|
||||
@@ -322,14 +342,59 @@ def run_scheduled(
|
||||
|
||||
log_path.touch(exist_ok=True)
|
||||
write_yaml_atomic(meta_path, meta)
|
||||
if state_callback is not None:
|
||||
state_callback(
|
||||
{
|
||||
"status": "running",
|
||||
"phase": "preparing",
|
||||
"snapshot": str(incomplete_dir),
|
||||
"log": str(log_path),
|
||||
"rsync": {"command": cmd, "exit_code": None},
|
||||
}
|
||||
)
|
||||
|
||||
result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check)
|
||||
def process_started(pid: int, pgid: int) -> None:
|
||||
if state_callback is None:
|
||||
return
|
||||
state_callback(
|
||||
{
|
||||
"status": "running",
|
||||
"phase": "rsync",
|
||||
"snapshot": str(incomplete_dir),
|
||||
"log": str(log_path),
|
||||
"rsync": {"command": cmd, "exit_code": None, "pid": pid, "pgid": pgid},
|
||||
}
|
||||
)
|
||||
|
||||
run_rsync_kwargs: dict[str, Any] = {
|
||||
"log_path": log_path,
|
||||
"timeout_seconds": timeout_seconds,
|
||||
"cancel_check": cancel_check,
|
||||
}
|
||||
if state_callback is not None:
|
||||
run_rsync_kwargs["process_started"] = process_started
|
||||
result = run_rsync(cmd, **run_rsync_kwargs)
|
||||
log_tail = _read_log_tail(log_path)
|
||||
warning = classify_rsync_warning(result.exit_code, log_tail)
|
||||
successful_or_warning = result.exit_code == 0 or warning is not None
|
||||
if state_callback is not None:
|
||||
state_callback(
|
||||
{
|
||||
"status": "running",
|
||||
"phase": "finalizing",
|
||||
"snapshot": str(incomplete_dir),
|
||||
"log": str(log_path),
|
||||
"rsync": {"command": cmd, "exit_code": result.exit_code, "log_tail": log_tail},
|
||||
}
|
||||
)
|
||||
|
||||
end_ts = utc_now()
|
||||
meta["ended_at"] = format_iso_z(end_ts)
|
||||
meta["duration_seconds"] = int((end_ts - ts).total_seconds())
|
||||
meta["rsync"]["exit_code"] = result.exit_code
|
||||
meta["status"] = "cancelled" if result.cancelled else ("success" if result.exit_code == 0 else "failed")
|
||||
meta["status"] = "cancelled" if result.cancelled else ("warning" if warning else ("success" if result.exit_code == 0 else "failed"))
|
||||
if warning is not None:
|
||||
meta["warning"] = warning
|
||||
meta["stats"] = _collect_run_stats(
|
||||
log_path=log_path,
|
||||
backup_root=Path(backup_root),
|
||||
@@ -349,8 +414,7 @@ def run_scheduled(
|
||||
"error": "rsync.log missing after execution",
|
||||
}
|
||||
|
||||
if result.exit_code != 0:
|
||||
log_tail = _read_log_tail(log_path)
|
||||
if not successful_or_warning:
|
||||
return {
|
||||
"ok": False,
|
||||
"dry_run": False,
|
||||
@@ -404,7 +468,9 @@ def run_scheduled(
|
||||
"snapshot": str(final_dir),
|
||||
"base": str(base_dir) if base_dir else None,
|
||||
"log": str(final_log_path),
|
||||
"rsync": {"exit_code": result.exit_code},
|
||||
"status": meta["status"],
|
||||
"warning": warning,
|
||||
"rsync": {"exit_code": result.exit_code, "log_tail": log_tail},
|
||||
"verbose_output": bool(verbose_output),
|
||||
"duration_seconds": meta["duration_seconds"],
|
||||
"stats": meta["stats"],
|
||||
|
||||
@@ -82,6 +82,7 @@ def run_rsync(
|
||||
log_path: Path,
|
||||
timeout_seconds: int,
|
||||
cancel_check: Callable[[], bool] | None = None,
|
||||
process_started: Callable[[int, int], None] | None = None,
|
||||
) -> RsyncResult:
|
||||
"""
|
||||
Run rsync and always write stdout/stderr to log_path.
|
||||
@@ -95,6 +96,8 @@ def run_rsync(
|
||||
|
||||
with log_path.open("ab") as f:
|
||||
process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, start_new_session=True)
|
||||
if process_started is not None:
|
||||
process_started(process.pid, os.getpgid(process.pid))
|
||||
started = time.monotonic()
|
||||
while True:
|
||||
exit_code = process.poll()
|
||||
|
||||
@@ -8,7 +8,13 @@ from pathlib import Path
|
||||
from django.db import transaction
|
||||
from django.utils import timezone
|
||||
|
||||
from pobsync.commands.run_scheduled import DEFAULT_DRY_RUN_TIMEOUT_SECONDS, classify_rsync_failure, dry_run_log_path, run_scheduled
|
||||
from pobsync.commands.run_scheduled import (
|
||||
DEFAULT_DRY_RUN_TIMEOUT_SECONDS,
|
||||
classify_rsync_failure,
|
||||
classify_rsync_warning,
|
||||
dry_run_log_path,
|
||||
run_scheduled,
|
||||
)
|
||||
from pobsync_backend.config_source import DjangoConfigSource
|
||||
from pobsync_backend.models import BackupRun, HostConfig
|
||||
from pobsync_backend.retention import run_sql_retention_apply
|
||||
@@ -66,6 +72,7 @@ def execute_backup_run(
|
||||
run_id=run.id,
|
||||
cancel_check=lambda: _run_cancel_requested(run.id),
|
||||
verbose_output=bool(dry_run or verbose_output),
|
||||
state_callback=lambda state: _record_running_state(run.id, state),
|
||||
)
|
||||
except Exception as exc:
|
||||
run.refresh_from_db()
|
||||
@@ -83,6 +90,8 @@ def execute_backup_run(
|
||||
run.refresh_from_db()
|
||||
if result.get("cancelled") or run.status == BackupRun.Status.CANCELLED:
|
||||
run.status = BackupRun.Status.CANCELLED
|
||||
elif result.get("status") == BackupRun.Status.WARNING:
|
||||
run.status = BackupRun.Status.WARNING
|
||||
else:
|
||||
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
|
||||
run.ended_at = timezone.now()
|
||||
@@ -201,11 +210,98 @@ def _run_cancel_requested(run_id: int) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _record_running_state(run_id: int, state: dict[str, object]) -> None:
|
||||
try:
|
||||
run = BackupRun.objects.only("id", "status", "result", "snapshot_path", "rsync_exit_code").get(id=run_id)
|
||||
except BackupRun.DoesNotExist:
|
||||
return
|
||||
if run.status != BackupRun.Status.RUNNING:
|
||||
return
|
||||
|
||||
result = run.result if isinstance(run.result, dict) else {}
|
||||
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
||||
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
|
||||
incoming_rsync = state.get("rsync") if isinstance(state.get("rsync"), dict) else {}
|
||||
|
||||
log_path = state.get("log")
|
||||
snapshot_path = state.get("snapshot")
|
||||
phase = state.get("phase")
|
||||
if isinstance(phase, str) and phase:
|
||||
execution["phase"] = phase
|
||||
if isinstance(log_path, str) and log_path:
|
||||
execution["log"] = log_path
|
||||
if isinstance(snapshot_path, str) and snapshot_path:
|
||||
execution["snapshot"] = snapshot_path
|
||||
run.snapshot_path = snapshot_path
|
||||
if incoming_rsync:
|
||||
result["rsync"] = {**rsync, **incoming_rsync}
|
||||
exit_code = incoming_rsync.get("exit_code")
|
||||
if isinstance(exit_code, int):
|
||||
run.rsync_exit_code = exit_code
|
||||
result["execution"] = {
|
||||
**execution,
|
||||
"worker_pid": os.getpid(),
|
||||
"worker_host": socket.gethostname(),
|
||||
"heartbeat_at": timezone.now().isoformat(),
|
||||
}
|
||||
run.result = result
|
||||
run.save(update_fields=["snapshot_path", "rsync_exit_code", "result"])
|
||||
|
||||
|
||||
def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_seconds: int) -> bool:
|
||||
result = run.result if isinstance(run.result, dict) else {}
|
||||
requested = result.get("requested") if isinstance(result.get("requested"), dict) else {}
|
||||
log_path = _execution_log_path(result)
|
||||
log_tail = _read_log_tail(log_path) if log_path is not None else []
|
||||
terminal_log = _terminal_rsync_log(log_tail)
|
||||
exit_code = _exit_code_from_log(log_tail)
|
||||
stale_worker = _running_worker_timed_out(run=run, stale_worker_seconds=stale_worker_seconds)
|
||||
if not requested.get("dry_run"):
|
||||
if terminal_log:
|
||||
failure = classify_rsync_failure(exit_code or 255, log_tail)
|
||||
result.update(
|
||||
{
|
||||
"ok": False,
|
||||
"host": run.host.host,
|
||||
"log": str(log_path) if log_path else "",
|
||||
"failure": failure,
|
||||
"rsync": {
|
||||
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
|
||||
"exit_code": exit_code or 255,
|
||||
"log_tail": log_tail,
|
||||
},
|
||||
}
|
||||
)
|
||||
run.status = BackupRun.Status.FAILED
|
||||
run.ended_at = timezone.now()
|
||||
run.rsync_exit_code = exit_code or 255
|
||||
run.result = result
|
||||
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
|
||||
return True
|
||||
if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds):
|
||||
result.update(
|
||||
{
|
||||
"ok": False,
|
||||
"host": run.host.host,
|
||||
"log": str(log_path) if log_path else "",
|
||||
"failure": {
|
||||
"category": "rsync_process",
|
||||
"message": "The rsync process is no longer running while the backup is still marked running.",
|
||||
"hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.",
|
||||
},
|
||||
"rsync": {
|
||||
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
|
||||
"exit_code": exit_code or 255,
|
||||
"log_tail": log_tail,
|
||||
},
|
||||
}
|
||||
)
|
||||
run.status = BackupRun.Status.FAILED
|
||||
run.ended_at = timezone.now()
|
||||
run.rsync_exit_code = exit_code or 255
|
||||
run.result = result
|
||||
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
|
||||
return True
|
||||
if stale_worker:
|
||||
result.update(
|
||||
{
|
||||
@@ -225,14 +321,11 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s
|
||||
return True
|
||||
return False
|
||||
|
||||
log_path = _execution_log_path(result)
|
||||
log_tail = _read_log_tail(log_path) if log_path is not None else []
|
||||
terminal_log = _terminal_rsync_log(log_tail)
|
||||
timed_out = _running_dry_run_timed_out(run=run, grace_seconds=grace_seconds)
|
||||
if not terminal_log and not timed_out and not stale_worker:
|
||||
return False
|
||||
|
||||
exit_code = _exit_code_from_log(log_tail) or (124 if timed_out or stale_worker else 255)
|
||||
exit_code = exit_code or (124 if timed_out or stale_worker else 255)
|
||||
failure = classify_rsync_failure(exit_code, log_tail)
|
||||
if stale_worker and not terminal_log:
|
||||
failure = {
|
||||
@@ -305,6 +398,9 @@ def _read_log_tail(log_path: Path | None, *, max_lines: int = 40) -> list[str]:
|
||||
|
||||
|
||||
def _terminal_rsync_log(log_tail: list[str]) -> bool:
|
||||
warning = classify_rsync_warning(_exit_code_from_log(log_tail), log_tail)
|
||||
if warning is not None:
|
||||
return False
|
||||
return any(line.startswith("rsync error:") for line in log_tail)
|
||||
|
||||
|
||||
@@ -312,6 +408,8 @@ def _exit_code_from_log(log_tail: list[str]) -> int | None:
|
||||
for line in reversed(log_tail):
|
||||
if "code 255" in line:
|
||||
return 255
|
||||
if "code 24" in line:
|
||||
return 24
|
||||
if "code 124" in line:
|
||||
return 124
|
||||
if "code 12" in line:
|
||||
@@ -342,6 +440,33 @@ def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> b
|
||||
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
|
||||
|
||||
|
||||
def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool:
|
||||
if grace_seconds <= 0:
|
||||
return False
|
||||
result = run.result if isinstance(run.result, dict) else {}
|
||||
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
|
||||
if execution.get("phase") != "rsync":
|
||||
return False
|
||||
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
|
||||
pid = rsync.get("pid")
|
||||
if not isinstance(pid, int) or pid <= 0:
|
||||
return False
|
||||
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at
|
||||
if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds):
|
||||
return False
|
||||
return not _process_exists(pid)
|
||||
|
||||
|
||||
def _process_exists(pid: int) -> bool:
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
except ProcessLookupError:
|
||||
return False
|
||||
except PermissionError:
|
||||
return True
|
||||
return True
|
||||
|
||||
|
||||
def _parse_iso_datetime(value: object):
|
||||
if not isinstance(value, str) or not value:
|
||||
return None
|
||||
|
||||
@@ -85,6 +85,37 @@ class BackupWorkerTests(TestCase):
|
||||
self.assertEqual(SnapshotRecord.objects.count(), 1)
|
||||
self.assertEqual(run.snapshot, SnapshotRecord.objects.get())
|
||||
|
||||
def test_worker_records_warning_status_from_completed_run(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
backup_root = Path(tmp) / "backups"
|
||||
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
|
||||
meta_dir = snapshot_dir / "meta"
|
||||
meta_dir.mkdir(parents=True)
|
||||
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "warning", "started_at": "2026-05-19T02:15:00Z"})
|
||||
run = queue_backup_run(host=host)
|
||||
|
||||
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
|
||||
run_scheduled.return_value = {
|
||||
"ok": True,
|
||||
"status": "warning",
|
||||
"dry_run": False,
|
||||
"host": host.host,
|
||||
"snapshot": str(snapshot_dir),
|
||||
"base": None,
|
||||
"warning": {"category": "vanished"},
|
||||
"rsync": {"exit_code": 24},
|
||||
}
|
||||
|
||||
count = Command()._run_once(prefix=Path(tmp) / "home")
|
||||
|
||||
self.assertEqual(count, 1)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.WARNING)
|
||||
self.assertEqual(run.rsync_exit_code, 24)
|
||||
self.assertEqual(run.result["warning"]["category"], "vanished")
|
||||
|
||||
def test_worker_refreshes_heartbeat_while_run_is_active(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
|
||||
@@ -116,6 +147,44 @@ class BackupWorkerTests(TestCase):
|
||||
run_scheduled.side_effect = fake_run_scheduled
|
||||
Command()._run_once(prefix=Path(tmp) / "home")
|
||||
|
||||
def test_worker_records_real_run_log_path_while_running(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host)
|
||||
snapshot_dir = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH"
|
||||
log_path = snapshot_dir / "meta" / "rsync.log"
|
||||
|
||||
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
|
||||
def fake_run_scheduled(**kwargs):
|
||||
kwargs["state_callback"](
|
||||
{
|
||||
"status": "running",
|
||||
"phase": "rsync",
|
||||
"snapshot": str(snapshot_dir),
|
||||
"log": str(log_path),
|
||||
"rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234},
|
||||
}
|
||||
)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.snapshot_path, str(snapshot_dir))
|
||||
self.assertEqual(run.result["execution"]["phase"], "rsync")
|
||||
self.assertEqual(run.result["execution"]["log"], str(log_path))
|
||||
self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir))
|
||||
self.assertEqual(run.result["rsync"]["command"], ["rsync"])
|
||||
self.assertEqual(run.result["rsync"]["pid"], 1234)
|
||||
return {
|
||||
"ok": True,
|
||||
"dry_run": False,
|
||||
"host": host.host,
|
||||
"snapshot": "",
|
||||
"base": None,
|
||||
"rsync": {"exit_code": 0},
|
||||
}
|
||||
|
||||
run_scheduled.side_effect = fake_run_scheduled
|
||||
Command()._run_once(prefix=Path(tmp) / "home")
|
||||
|
||||
def test_worker_reconciles_stale_real_run_after_heartbeat_timeout(self) -> None:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host)
|
||||
@@ -136,6 +205,97 @@ class BackupWorkerTests(TestCase):
|
||||
self.assertEqual(run.result["failure"]["category"], "worker")
|
||||
self.assertIn("heartbeat stopped", run.result["failure"]["message"])
|
||||
|
||||
def test_worker_reconciles_real_run_with_terminal_broken_pipe_log(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host)
|
||||
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
log_path.write_text(
|
||||
"rsync error: unexplained error (code 255) at rsync.c(716) [generator=3.4.1]\n"
|
||||
"rsync error: received SIGUSR1 (code 19) at main.c(1600) [receiver=3.4.1]\n"
|
||||
"rsync: [generator] write error: Broken pipe (32)\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
run.status = BackupRun.Status.RUNNING
|
||||
run.started_at = timezone.now()
|
||||
run.result["execution"] = {"log": str(log_path)}
|
||||
run.save(update_fields=["status", "started_at", "result"])
|
||||
|
||||
reconciled = reconcile_running_runs()
|
||||
|
||||
self.assertEqual(reconciled, 1)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.FAILED)
|
||||
self.assertEqual(run.rsync_exit_code, 255)
|
||||
self.assertEqual(run.result["failure"]["category"], "transport")
|
||||
self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"]))
|
||||
|
||||
def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host)
|
||||
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
log_path.write_text("sending incremental file list\n", encoding="utf-8")
|
||||
run.status = BackupRun.Status.RUNNING
|
||||
run.started_at = timezone.now() - timedelta(minutes=10)
|
||||
run.result["execution"] = {
|
||||
"phase": "rsync",
|
||||
"log": str(log_path),
|
||||
"heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(),
|
||||
}
|
||||
run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]}
|
||||
run.save(update_fields=["status", "started_at", "result"])
|
||||
|
||||
reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60)
|
||||
|
||||
self.assertEqual(reconciled, 1)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.FAILED)
|
||||
self.assertEqual(run.result["failure"]["category"], "rsync_process")
|
||||
self.assertEqual(run.rsync_exit_code, 255)
|
||||
|
||||
def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host)
|
||||
run.status = BackupRun.Status.RUNNING
|
||||
run.started_at = timezone.now() - timedelta(minutes=10)
|
||||
run.result["execution"] = {
|
||||
"phase": "finalizing",
|
||||
"heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(),
|
||||
}
|
||||
run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0}
|
||||
run.save(update_fields=["status", "started_at", "result"])
|
||||
|
||||
reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60)
|
||||
|
||||
self.assertEqual(reconciled, 0)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.RUNNING)
|
||||
|
||||
def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||
run = queue_backup_run(host=host)
|
||||
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
log_path.write_text(
|
||||
"file has vanished: \"/var/lib/app/session\"\n"
|
||||
"rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
run.status = BackupRun.Status.RUNNING
|
||||
run.started_at = timezone.now()
|
||||
run.result["execution"] = {"log": str(log_path)}
|
||||
run.save(update_fields=["status", "started_at", "result"])
|
||||
|
||||
reconciled = reconcile_running_runs()
|
||||
|
||||
self.assertEqual(reconciled, 0)
|
||||
run.refresh_from_db()
|
||||
self.assertEqual(run.status, BackupRun.Status.RUNNING)
|
||||
|
||||
def test_worker_records_dry_run_log_path_while_running(self) -> None:
|
||||
with TemporaryDirectory() as tmp:
|
||||
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
|
||||
|
||||
@@ -256,6 +256,71 @@ class RunScheduledConfigSourceTests(SimpleTestCase):
|
||||
self.assertIn("stats:", meta_text)
|
||||
self.assertIn("files_total: 10", meta_text)
|
||||
|
||||
def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None:
|
||||
states = []
|
||||
|
||||
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None, process_started=None):
|
||||
self.assertEqual(len(states), 1)
|
||||
self.assertEqual(states[0]["status"], "running")
|
||||
self.assertEqual(states[0]["phase"], "preparing")
|
||||
self.assertEqual(states[0]["log"], str(log_path))
|
||||
self.assertEqual(states[0]["rsync"]["command"], command)
|
||||
self.assertIsNotNone(process_started)
|
||||
process_started(1234, 1234)
|
||||
self.assertEqual(len(states), 2)
|
||||
self.assertEqual(states[1]["phase"], "rsync")
|
||||
self.assertEqual(states[1]["rsync"]["pid"], 1234)
|
||||
self.assertEqual(states[1]["rsync"]["pgid"], 1234)
|
||||
log_path.write_text("Number of files: 1\n", encoding="utf-8")
|
||||
return RsyncResult(exit_code=0, command=command)
|
||||
|
||||
with TemporaryDirectory() as tmp:
|
||||
with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync):
|
||||
run_scheduled(
|
||||
prefix=Path(tmp) / "home",
|
||||
host="web-01",
|
||||
dry_run=False,
|
||||
config_source=FakeConfigSource(backup_root=str(Path(tmp) / "backups")),
|
||||
state_callback=states.append,
|
||||
)
|
||||
|
||||
self.assertEqual(len(states), 3)
|
||||
self.assertIn("/.incomplete/", states[0]["snapshot"])
|
||||
self.assertEqual(states[2]["phase"], "finalizing")
|
||||
self.assertEqual(states[2]["rsync"]["exit_code"], 0)
|
||||
|
||||
def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None:
|
||||
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
|
||||
log_path.write_text(
|
||||
"file has vanished: \"/var/lib/app/session\"\n"
|
||||
"rsync warning: some files vanished before they could be transferred (code 24) at main.c(1338) [sender=3.4.1]\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
data_dir = log_path.parent.parent / "data"
|
||||
data_dir.mkdir(parents=True, exist_ok=True)
|
||||
(data_dir / "payload.txt").write_text("payload", encoding="utf-8")
|
||||
return RsyncResult(exit_code=24, command=command)
|
||||
|
||||
with TemporaryDirectory() as tmp:
|
||||
backup_root = Path(tmp) / "backups"
|
||||
with patch("pobsync.commands.run_scheduled.run_rsync", side_effect=fake_run_rsync):
|
||||
result = run_scheduled(
|
||||
prefix=Path(tmp) / "home",
|
||||
host="web-01",
|
||||
dry_run=False,
|
||||
config_source=FakeConfigSource(backup_root=str(backup_root)),
|
||||
)
|
||||
|
||||
snapshot = Path(result["snapshot"])
|
||||
self.assertTrue((snapshot / "data" / "payload.txt").exists())
|
||||
|
||||
self.assertTrue(result["ok"])
|
||||
self.assertEqual(result["status"], "warning")
|
||||
self.assertEqual(result["rsync"]["exit_code"], 24)
|
||||
self.assertEqual(result["warning"]["category"], "vanished")
|
||||
self.assertEqual(snapshot.parent.name, "scheduled")
|
||||
self.assertIn("file has vanished", "\n".join(result["rsync"]["log_tail"]))
|
||||
|
||||
def test_dry_run_reports_cancelled_rsync(self) -> None:
|
||||
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
|
||||
self.assertTrue(cancel_check())
|
||||
|
||||
Reference in New Issue
Block a user