issue-54-worker-rsync-state #56

Merged
parkel merged 3 commits from issue-54-worker-rsync-state into master 2026-05-23 00:39:47 +02:00
5 changed files with 146 additions and 4 deletions
Showing only changes of commit 28da9c4096 - Show all commits

View File

@@ -346,16 +346,46 @@ def run_scheduled(
state_callback(
{
"status": "running",
"phase": "preparing",
"snapshot": str(incomplete_dir),
"log": str(log_path),
"rsync": {"command": cmd, "exit_code": None},
}
)
result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds, cancel_check=cancel_check)
def process_started(pid: int, pgid: int) -> None:
if state_callback is None:
return
state_callback(
{
"status": "running",
"phase": "rsync",
"snapshot": str(incomplete_dir),
"log": str(log_path),
"rsync": {"command": cmd, "exit_code": None, "pid": pid, "pgid": pgid},
}
)
result = run_rsync(
cmd,
log_path=log_path,
timeout_seconds=timeout_seconds,
cancel_check=cancel_check,
process_started=process_started if state_callback is not None else None,
)
log_tail = _read_log_tail(log_path)
warning = classify_rsync_warning(result.exit_code, log_tail)
successful_or_warning = result.exit_code == 0 or warning is not None
if state_callback is not None:
state_callback(
{
"status": "running",
"phase": "finalizing",
"snapshot": str(incomplete_dir),
"log": str(log_path),
"rsync": {"command": cmd, "exit_code": result.exit_code, "log_tail": log_tail},
}
)
end_ts = utc_now()
meta["ended_at"] = format_iso_z(end_ts)

View File

@@ -82,6 +82,7 @@ def run_rsync(
log_path: Path,
timeout_seconds: int,
cancel_check: Callable[[], bool] | None = None,
process_started: Callable[[int, int], None] | None = None,
) -> RsyncResult:
"""
Run rsync and always write stdout/stderr to log_path.
@@ -95,6 +96,8 @@ def run_rsync(
with log_path.open("ab") as f:
process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, start_new_session=True)
if process_started is not None:
process_started(process.pid, os.getpgid(process.pid))
started = time.monotonic()
while True:
exit_code = process.poll()

View File

@@ -225,6 +225,9 @@ def _record_running_state(run_id: int, state: dict[str, object]) -> None:
log_path = state.get("log")
snapshot_path = state.get("snapshot")
phase = state.get("phase")
if isinstance(phase, str) and phase:
execution["phase"] = phase
if isinstance(log_path, str) and log_path:
execution["log"] = log_path
if isinstance(snapshot_path, str) and snapshot_path:
@@ -275,6 +278,30 @@ def _reconcile_running_run(*, run: BackupRun, grace_seconds: int, stale_worker_s
run.result = result
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
return True
if _running_rsync_process_missing(run=run, grace_seconds=grace_seconds):
result.update(
{
"ok": False,
"host": run.host.host,
"log": str(log_path) if log_path else "",
"failure": {
"category": "rsync_process",
"message": "The rsync process is no longer running while the backup is still marked running.",
"hint": "Check the rsync log and pobsync-worker.service logs before retrying the backup.",
},
"rsync": {
**(result.get("rsync") if isinstance(result.get("rsync"), dict) else {}),
"exit_code": exit_code or 255,
"log_tail": log_tail,
},
}
)
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.rsync_exit_code = exit_code or 255
run.result = result
run.save(update_fields=["status", "ended_at", "rsync_exit_code", "result"])
return True
if stale_worker:
result.update(
{
@@ -413,6 +440,33 @@ def _running_worker_timed_out(*, run: BackupRun, stale_worker_seconds: int) -> b
return timezone.now() >= heartbeat_at + timedelta(seconds=stale_worker_seconds)
def _running_rsync_process_missing(*, run: BackupRun, grace_seconds: int) -> bool:
if grace_seconds <= 0:
return False
result = run.result if isinstance(run.result, dict) else {}
execution = result.get("execution") if isinstance(result.get("execution"), dict) else {}
if execution.get("phase") != "rsync":
return False
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
pid = rsync.get("pid")
if not isinstance(pid, int) or pid <= 0:
return False
heartbeat_at = _parse_iso_datetime(execution.get("heartbeat_at")) or run.started_at
if heartbeat_at is None or timezone.now() < heartbeat_at + timedelta(seconds=grace_seconds):
return False
return not _process_exists(pid)
def _process_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _parse_iso_datetime(value: object):
if not isinstance(value, str) or not value:
return None

View File

@@ -160,16 +160,19 @@ class BackupWorkerTests(TestCase):
kwargs["state_callback"](
{
"status": "running",
"phase": "rsync",
"snapshot": str(snapshot_dir),
"log": str(log_path),
"rsync": {"command": ["rsync"], "exit_code": None},
"rsync": {"command": ["rsync"], "exit_code": None, "pid": 1234, "pgid": 1234},
}
)
run.refresh_from_db()
self.assertEqual(run.snapshot_path, str(snapshot_dir))
self.assertEqual(run.result["execution"]["phase"], "rsync")
self.assertEqual(run.result["execution"]["log"], str(log_path))
self.assertEqual(run.result["execution"]["snapshot"], str(snapshot_dir))
self.assertEqual(run.result["rsync"]["command"], ["rsync"])
self.assertEqual(run.result["rsync"]["pid"], 1234)
return {
"ok": True,
"dry_run": False,
@@ -228,6 +231,49 @@ class BackupWorkerTests(TestCase):
self.assertEqual(run.result["failure"]["category"], "transport")
self.assertIn("Broken pipe", "\n".join(run.result["rsync"]["log_tail"]))
def test_worker_reconciles_real_run_when_rsync_process_disappears(self) -> None:
with TemporaryDirectory() as tmp:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
log_path = Path(tmp) / "backups" / host.host / ".incomplete" / "20260519-021500Z__ABCDEFGH" / "meta" / "rsync.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text("sending incremental file list\n", encoding="utf-8")
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now() - timedelta(minutes=10)
run.result["execution"] = {
"phase": "rsync",
"log": str(log_path),
"heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(),
}
run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"]}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60)
self.assertEqual(reconciled, 1)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.FAILED)
self.assertEqual(run.result["failure"]["category"], "rsync_process")
self.assertEqual(run.rsync_exit_code, 255)
def test_worker_does_not_reconcile_missing_rsync_process_during_finalizing_phase(self) -> None:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(host=host)
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now() - timedelta(minutes=10)
run.result["execution"] = {
"phase": "finalizing",
"heartbeat_at": (timezone.now() - timedelta(minutes=10)).isoformat(),
}
run.result["rsync"] = {"pid": 999999, "pgid": 999999, "command": ["rsync"], "exit_code": 0}
run.save(update_fields=["status", "started_at", "result"])
reconciled = reconcile_running_runs(grace_seconds=300, stale_worker_seconds=24 * 60 * 60)
self.assertEqual(reconciled, 0)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.RUNNING)
def test_worker_does_not_fail_real_run_for_vanished_file_warning_log(self) -> None:
with TemporaryDirectory() as tmp:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")

View File

@@ -259,11 +259,18 @@ class RunScheduledConfigSourceTests(SimpleTestCase):
def test_real_run_reports_running_state_callback_before_rsync_returns(self) -> None:
states = []
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None, process_started=None):
self.assertEqual(len(states), 1)
self.assertEqual(states[0]["status"], "running")
self.assertEqual(states[0]["phase"], "preparing")
self.assertEqual(states[0]["log"], str(log_path))
self.assertEqual(states[0]["rsync"]["command"], command)
self.assertIsNotNone(process_started)
process_started(1234, 1234)
self.assertEqual(len(states), 2)
self.assertEqual(states[1]["phase"], "rsync")
self.assertEqual(states[1]["rsync"]["pid"], 1234)
self.assertEqual(states[1]["rsync"]["pgid"], 1234)
log_path.write_text("Number of files: 1\n", encoding="utf-8")
return RsyncResult(exit_code=0, command=command)
@@ -277,8 +284,10 @@ class RunScheduledConfigSourceTests(SimpleTestCase):
state_callback=states.append,
)
self.assertEqual(len(states), 1)
self.assertEqual(len(states), 3)
self.assertIn("/.incomplete/", states[0]["snapshot"])
self.assertEqual(states[2]["phase"], "finalizing")
self.assertEqual(states[2]["rsync"]["exit_code"], 0)
def test_real_run_keeps_snapshot_with_warning_for_vanished_files(self) -> None:
def fake_run_rsync(command, log_path, timeout_seconds, cancel_check=None):