From fe8e65e12efa7bdd64b2e6ebe0fffae87809afa6 Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Tue, 19 May 2026 13:00:12 +0200 Subject: [PATCH] (feature) add queued backup worker foundation Move backup execution out of the management command into a reusable backup runner service that can execute an existing BackupRun record. Add queue primitives and a run_pobsync_worker command so manual backup requests can be recorded as queued SQL state and processed outside the web request path. Add a worker Docker service and pobsync worker CLI alias, with tests for queued run creation, worker execution, manual run typing, and command mapping. --- README.md | 6 +- docker-compose.yml | 35 +++++ src/pobsync/cli.py | 1 + src/pobsync_backend/backup_runner.py | 148 ++++++++++++++++++ .../management/commands/run_pobsync_backup.py | 95 ++--------- .../management/commands/run_pobsync_worker.py | 52 ++++++ .../tests/test_backup_worker.py | 70 +++++++++ .../tests/test_console_entrypoint.py | 7 + .../tests/test_run_backup_records_snapshot.py | 39 ++++- 9 files changed, 361 insertions(+), 92 deletions(-) create mode 100644 src/pobsync_backend/backup_runner.py create mode 100644 src/pobsync_backend/management/commands/run_pobsync_worker.py create mode 100644 src/pobsync_backend/tests/test_backup_worker.py diff --git a/README.md b/README.md index e25bc71..65dc5f1 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ This starts Django on: Run the scheduler alongside the web admin: ``` -docker compose up --build web scheduler +docker compose up --build web scheduler worker ``` The container persists `/opt/pobsync` and the SQLite database in Docker volumes. @@ -133,7 +133,7 @@ Backup data is mounted at `/backups` inside the containers. By default this uses Override it with `POBSYNC_BACKUP_ROOT`: ``` -POBSYNC_BACKUP_ROOT=/mnt/backups/pobsync docker compose up --build web scheduler +POBSYNC_BACKUP_ROOT=/mnt/backups/pobsync docker compose up --build web scheduler worker ``` In the Django global config, set the backup root to `/backups` when running in Docker. For local, non-Docker use, @@ -148,7 +148,7 @@ docker compose --profile mariadb up --build web-mariadb With the scheduler: ``` -docker compose --profile mariadb up --build web-mariadb scheduler-mariadb +docker compose --profile mariadb up --build web-mariadb scheduler-mariadb worker-mariadb ``` SQLite remains the default because it is enough for a single backup server and keeps deployment simple. diff --git a/docker-compose.yml b/docker-compose.yml index f3cee44..5ed7c5e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,6 +29,20 @@ services: - pobsync_db:/var/lib/pobsync - ${POBSYNC_BACKUP_ROOT:-./backups}:/backups + worker: + build: . + command: python manage.py run_pobsync_worker --loop --interval 15 + environment: + POBSYNC_DJANGO_DEBUG: "1" + POBSYNC_DJANGO_SECRET_KEY: "dev-only-change-me" + POBSYNC_DJANGO_ALLOWED_HOSTS: "localhost,127.0.0.1,0.0.0.0" + POBSYNC_HOME: "/opt/pobsync" + POBSYNC_SQLITE_PATH: "/var/lib/pobsync/pobsync.sqlite3" + volumes: + - pobsync_state:/opt/pobsync + - pobsync_db:/var/lib/pobsync + - ${POBSYNC_BACKUP_ROOT:-./backups}:/backups + web-mariadb: profiles: ["mariadb"] build: . @@ -73,6 +87,27 @@ services: - pobsync_state:/opt/pobsync - ${POBSYNC_BACKUP_ROOT:-./backups}:/backups + worker-mariadb: + profiles: ["mariadb"] + build: . + command: python manage.py run_pobsync_worker --loop --interval 15 + environment: + POBSYNC_DJANGO_DEBUG: "1" + POBSYNC_DJANGO_SECRET_KEY: "dev-only-change-me" + POBSYNC_DJANGO_ALLOWED_HOSTS: "localhost,127.0.0.1,0.0.0.0" + POBSYNC_HOME: "/opt/pobsync" + POBSYNC_DB_ENGINE: "mariadb" + POBSYNC_DB_HOST: "db" + POBSYNC_DB_NAME: "pobsync" + POBSYNC_DB_USER: "pobsync" + POBSYNC_DB_PASSWORD: "pobsync" + depends_on: + db: + condition: service_healthy + volumes: + - pobsync_state:/opt/pobsync + - ${POBSYNC_BACKUP_ROOT:-./backups}:/backups + db: profiles: ["mariadb"] image: mariadb:11 diff --git a/src/pobsync/cli.py b/src/pobsync/cli.py index 4595c25..6bbdca3 100644 --- a/src/pobsync/cli.py +++ b/src/pobsync/cli.py @@ -15,6 +15,7 @@ COMMAND_ALIASES = { "retention": "run_pobsync_retention", "discover-snapshots": "discover_pobsync_snapshots", "scheduler": "run_pobsync_scheduler", + "worker": "run_pobsync_worker", } diff --git a/src/pobsync_backend/backup_runner.py b/src/pobsync_backend/backup_runner.py new file mode 100644 index 0000000..0ed54a4 --- /dev/null +++ b/src/pobsync_backend/backup_runner.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from pathlib import Path + +from django.db import transaction +from django.utils import timezone + +from pobsync.commands.run_scheduled import run_scheduled +from pobsync_backend.config_source import DjangoConfigSource +from pobsync_backend.models import BackupRun, HostConfig +from pobsync_backend.retention import run_sql_retention_apply +from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record + + +def queue_backup_run( + *, + host: HostConfig, + run_type: str = BackupRun.RunType.MANUAL, + dry_run: bool = False, + prune: bool = False, + prune_max_delete: int = 10, + prune_protect_bases: bool = False, +) -> BackupRun: + return BackupRun.objects.create( + host=host, + run_type=run_type, + status=BackupRun.Status.QUEUED, + result={ + "requested": { + "dry_run": bool(dry_run), + "prune": bool(prune), + "prune_max_delete": int(prune_max_delete), + "prune_protect_bases": bool(prune_protect_bases), + } + }, + ) + + +def execute_backup_run( + *, + run: BackupRun, + prefix: Path, + dry_run: bool = False, + prune: bool = False, + prune_max_delete: int = 10, + prune_protect_bases: bool = False, +) -> BackupRun: + run.status = BackupRun.Status.RUNNING + run.started_at = run.started_at or timezone.now() + run.save(update_fields=["status", "started_at"]) + + try: + result = run_scheduled( + prefix=prefix, + host=run.host.host, + dry_run=bool(dry_run), + prune=False, + config_source=DjangoConfigSource(), + ) + except Exception as exc: + run.status = BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.result = {"ok": False, "error": str(exc), "type": type(exc).__name__} + run.save(update_fields=["status", "ended_at", "result"]) + raise + + run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED + run.ended_at = timezone.now() + run.snapshot_path = str(result.get("snapshot") or "") + run.base_path = str(result.get("base") or "") + rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} + run.rsync_exit_code = rsync.get("exit_code") + run.result = result + snapshot_record = None + if run.snapshot_path: + snapshot_path = Path(run.snapshot_path) + try: + kind = infer_snapshot_kind(snapshot_path) + snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path) + except ValueError: + snapshot_record = None + + if result.get("ok") and not result.get("dry_run") and prune: + try: + result["prune"] = run_sql_retention_apply( + prefix=prefix, + host=run.host.host, + kind="scheduled", + protect_bases=bool(prune_protect_bases), + yes=True, + max_delete=int(prune_max_delete), + acquire_lock=False, + ) + except Exception as exc: + result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__} + run.status = BackupRun.Status.FAILED + run.result = result + run.snapshot = snapshot_record + run.save( + update_fields=[ + "status", + "ended_at", + "snapshot_path", + "snapshot", + "base_path", + "rsync_exit_code", + "result", + ], + ) + raise + + run.snapshot = snapshot_record + run.result = result + run.save( + update_fields=[ + "status", + "ended_at", + "snapshot_path", + "snapshot", + "base_path", + "rsync_exit_code", + "result", + ], + ) + return run + + +def claim_next_queued_run() -> BackupRun | None: + with transaction.atomic(): + run = ( + BackupRun.objects.select_related("host") + .filter(status=BackupRun.Status.QUEUED, host__enabled=True) + .order_by("created_at", "id") + .first() + ) + if run is None: + return None + run.status = BackupRun.Status.RUNNING + run.started_at = timezone.now() + run.save(update_fields=["status", "started_at"]) + return run + + +def requested_options(run: BackupRun) -> dict[str, object]: + requested = run.result.get("requested") if isinstance(run.result, dict) else None + if not isinstance(requested, dict): + return {} + return requested diff --git a/src/pobsync_backend/management/commands/run_pobsync_backup.py b/src/pobsync_backend/management/commands/run_pobsync_backup.py index 485438e..d54fcee 100644 --- a/src/pobsync_backend/management/commands/run_pobsync_backup.py +++ b/src/pobsync_backend/management/commands/run_pobsync_backup.py @@ -5,18 +5,14 @@ from typing import Any from django.conf import settings from django.core.management.base import BaseCommand, CommandError -from django.utils import timezone -from pobsync.commands.run_scheduled import run_scheduled from pobsync.paths import PobsyncPaths -from pobsync_backend.config_source import DjangoConfigSource +from pobsync_backend.backup_runner import execute_backup_run from pobsync_backend.models import BackupRun, HostConfig -from pobsync_backend.retention import run_sql_retention_apply -from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record class Command(BaseCommand): - help = "Run a scheduled pobsync backup and record the result in Django." + help = "Run a pobsync backup and record the result in Django." def add_arguments(self, parser) -> None: parser.add_argument("host", help="Host to back up") @@ -25,6 +21,7 @@ class Command(BaseCommand): parser.add_argument("--prune", action="store_true", help="Apply retention after a successful run") parser.add_argument("--prune-max-delete", type=int, default=10) parser.add_argument("--prune-protect-bases", action="store_true") + parser.add_argument("--manual", action="store_true", help="Record the run as manual instead of scheduled") def handle(self, *args: Any, **options: Any) -> None: host_name = options["host"] @@ -36,86 +33,20 @@ class Command(BaseCommand): run = BackupRun.objects.create( host=host, - run_type=BackupRun.RunType.SCHEDULED, + run_type=BackupRun.RunType.MANUAL if options["manual"] else BackupRun.RunType.SCHEDULED, status=BackupRun.Status.RUNNING, - started_at=timezone.now(), ) - - try: - result = run_scheduled( - prefix=paths.home, - host=host.host, - dry_run=bool(options["dry_run"]), - prune=False, - config_source=DjangoConfigSource(), - ) - except Exception as exc: - run.status = BackupRun.Status.FAILED - run.ended_at = timezone.now() - run.result = {"ok": False, "error": str(exc), "type": type(exc).__name__} - run.save(update_fields=["status", "ended_at", "result"]) - raise - - run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED - run.ended_at = timezone.now() - run.snapshot_path = str(result.get("snapshot") or "") - run.base_path = str(result.get("base") or "") - rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {} - run.rsync_exit_code = rsync.get("exit_code") - run.result = result - snapshot_record = None - if run.snapshot_path: - snapshot_path = Path(run.snapshot_path) - try: - kind = infer_snapshot_kind(snapshot_path) - snapshot_record, _created = upsert_snapshot_record(host=host, kind=kind, snapshot_dir=snapshot_path) - except ValueError: - snapshot_record = None - - if result.get("ok") and not result.get("dry_run") and options["prune"]: - try: - result["prune"] = run_sql_retention_apply( - prefix=paths.home, - host=host.host, - kind="scheduled", - protect_bases=bool(options["prune_protect_bases"]), - yes=True, - max_delete=int(options["prune_max_delete"]), - acquire_lock=False, - ) - except Exception as exc: - result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__} - run.status = BackupRun.Status.FAILED - run.result = result - run.snapshot = snapshot_record - run.save( - update_fields=[ - "status", - "ended_at", - "snapshot_path", - "snapshot", - "base_path", - "rsync_exit_code", - "result", - ], - ) - raise - - run.snapshot = snapshot_record - run.result = result - run.save( - update_fields=[ - "status", - "ended_at", - "snapshot_path", - "snapshot", - "base_path", - "rsync_exit_code", - "result", - ], + execute_backup_run( + run=run, + prefix=paths.home, + dry_run=bool(options["dry_run"]), + prune=bool(options["prune"]), + prune_max_delete=int(options["prune_max_delete"]), + prune_protect_bases=bool(options["prune_protect_bases"]), ) + run.refresh_from_db() - if result.get("ok"): + if run.status == BackupRun.Status.SUCCESS: self.stdout.write(self.style.SUCCESS(f"Backup completed for {host.host}.")) return diff --git a/src/pobsync_backend/management/commands/run_pobsync_worker.py b/src/pobsync_backend/management/commands/run_pobsync_worker.py new file mode 100644 index 0000000..848786c --- /dev/null +++ b/src/pobsync_backend/management/commands/run_pobsync_worker.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import time +from pathlib import Path +from typing import Any + +from django.conf import settings +from django.core.management.base import BaseCommand + +from pobsync.paths import PobsyncPaths +from pobsync_backend.backup_runner import claim_next_queued_run, execute_backup_run, requested_options + + +class Command(BaseCommand): + help = "Run queued pobsync backup jobs from the Django database." + + def add_arguments(self, parser) -> None: + parser.add_argument("--prefix", default=settings.POBSYNC_HOME, help="Pobsync home directory") + parser.add_argument("--once", action="store_true", help="Process one queued run and exit") + parser.add_argument("--loop", action="store_true", help="Keep checking for queued runs") + parser.add_argument("--interval", type=int, default=15, help="Loop interval in seconds") + + def handle(self, *args: Any, **options: Any) -> None: + if not options["once"] and not options["loop"]: + options["once"] = True + + paths = PobsyncPaths(home=Path(options["prefix"])) + while True: + count = self._run_once(prefix=paths.home) + self.stdout.write(f"Ran {count} queued backup run(s).") + if options["once"]: + return + time.sleep(max(1, int(options["interval"]))) + + def _run_once(self, *, prefix: Path) -> int: + run = claim_next_queued_run() + if run is None: + return 0 + + options = requested_options(run) + try: + execute_backup_run( + run=run, + prefix=prefix, + dry_run=bool(options.get("dry_run", False)), + prune=bool(options.get("prune", False)), + prune_max_delete=int(options.get("prune_max_delete", 10)), + prune_protect_bases=bool(options.get("prune_protect_bases", False)), + ) + except Exception as exc: + self.stderr.write(f"{run.host.host}: {type(exc).__name__}: {exc}") + return 1 diff --git a/src/pobsync_backend/tests/test_backup_worker.py b/src/pobsync_backend/tests/test_backup_worker.py new file mode 100644 index 0000000..eef53a8 --- /dev/null +++ b/src/pobsync_backend/tests/test_backup_worker.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + +from django.test import TestCase + +from pobsync.util import write_yaml_atomic +from pobsync_backend.backup_runner import queue_backup_run +from pobsync_backend.management.commands.run_pobsync_worker import Command +from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord + + +class BackupWorkerTests(TestCase): + def test_queue_backup_run_records_requested_options(self) -> None: + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + + run = queue_backup_run( + host=host, + dry_run=True, + prune=True, + prune_max_delete=3, + prune_protect_bases=True, + ) + + self.assertEqual(run.status, BackupRun.Status.QUEUED) + self.assertEqual(run.run_type, BackupRun.RunType.MANUAL) + self.assertEqual( + run.result["requested"], + { + "dry_run": True, + "prune": True, + "prune_max_delete": 3, + "prune_protect_bases": True, + }, + ) + + def test_worker_executes_next_queued_run(self) -> None: + with TemporaryDirectory() as tmp: + backup_root = Path(tmp) / "backups" + GlobalConfig.objects.create(name="default", backup_root=str(backup_root)) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH" + meta_dir = snapshot_dir / "meta" + meta_dir.mkdir(parents=True) + write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) + run = queue_backup_run(host=host) + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + run_scheduled.return_value = { + "ok": True, + "dry_run": False, + "host": host.host, + "snapshot": str(snapshot_dir), + "base": None, + "rsync": {"exit_code": 0}, + } + count = Command()._run_once(prefix=Path(tmp) / "home") + + self.assertEqual(count, 1) + run.refresh_from_db() + self.assertEqual(run.status, BackupRun.Status.SUCCESS) + self.assertEqual(SnapshotRecord.objects.count(), 1) + self.assertEqual(run.snapshot, SnapshotRecord.objects.get()) + + def test_worker_returns_zero_without_queued_runs(self) -> None: + count = Command()._run_once(prefix=Path("/opt/pobsync")) + + self.assertEqual(count, 0) diff --git a/src/pobsync_backend/tests/test_console_entrypoint.py b/src/pobsync_backend/tests/test_console_entrypoint.py index 434cf5b..2584582 100644 --- a/src/pobsync_backend/tests/test_console_entrypoint.py +++ b/src/pobsync_backend/tests/test_console_entrypoint.py @@ -46,3 +46,10 @@ class ConsoleEntrypointTests(SimpleTestCase): self.assertEqual(exit_code, 0) execute.assert_called_once_with(["pobsync", "discover_pobsync_snapshots", "--host", "web-01"]) + + def test_maps_worker_alias_to_django_command(self) -> None: + with patch("pobsync.cli.execute_from_command_line") as execute: + exit_code = main(["worker", "--once"]) + + self.assertEqual(exit_code, 0) + execute.assert_called_once_with(["pobsync", "run_pobsync_worker", "--once"]) diff --git a/src/pobsync_backend/tests/test_run_backup_records_snapshot.py b/src/pobsync_backend/tests/test_run_backup_records_snapshot.py index 41a6233..60c878f 100644 --- a/src/pobsync_backend/tests/test_run_backup_records_snapshot.py +++ b/src/pobsync_backend/tests/test_run_backup_records_snapshot.py @@ -32,7 +32,7 @@ class RunBackupRecordsSnapshotTests(TestCase): }, ) - with patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled: + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: run_scheduled.return_value = { "ok": True, "dry_run": False, @@ -63,9 +63,9 @@ class RunBackupRecordsSnapshotTests(TestCase): write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) with ( - patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled, + patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled, patch( - "pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply" + "pobsync_backend.backup_runner.run_sql_retention_apply" ) as retention_apply, ): run_scheduled.return_value = { @@ -113,9 +113,9 @@ class RunBackupRecordsSnapshotTests(TestCase): write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) with ( - patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled, + patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled, patch( - "pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply" + "pobsync_backend.backup_runner.run_sql_retention_apply" ) as retention_apply, ): run_scheduled.return_value = { @@ -155,7 +155,7 @@ class RunBackupRecordsSnapshotTests(TestCase): meta_dir.mkdir(parents=True) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "failed", "started_at": "2026-05-19T02:15:00Z"}) - with patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled: + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: run_scheduled.return_value = { "ok": False, "dry_run": False, @@ -179,7 +179,7 @@ class RunBackupRecordsSnapshotTests(TestCase): GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) host = HostConfig.objects.create(host="web-01", address="web-01.example.test") - with patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled: + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: run_scheduled.return_value = { "ok": True, "dry_run": True, @@ -198,3 +198,28 @@ class RunBackupRecordsSnapshotTests(TestCase): self.assertEqual(BackupRun.objects.count(), 1) self.assertIsNone(BackupRun.objects.get().snapshot) self.assertEqual(SnapshotRecord.objects.count(), 0) + + def test_manual_flag_records_manual_run_type(self) -> None: + with TemporaryDirectory() as tmp: + GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) + host = HostConfig.objects.create(host="web-01", address="web-01.example.test") + + with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled: + run_scheduled.return_value = { + "ok": True, + "dry_run": True, + "host": host.host, + "base": None, + "rsync": {"exit_code": 0}, + } + call_command( + "run_pobsync_backup", + host.host, + prefix=str(Path(tmp) / "home"), + dry_run=True, + manual=True, + stdout=StringIO(), + ) + + run = BackupRun.objects.get() + self.assertEqual(run.run_type, BackupRun.RunType.MANUAL)