(feature) add queued backup worker foundation

Move backup execution out of the management command into a reusable
backup runner service that can execute an existing BackupRun record.

Add queue primitives and a run_pobsync_worker command so manual backup
requests can be recorded as queued SQL state and processed outside the
web request path.

Add a worker Docker service and pobsync worker CLI alias, with tests for
queued run creation, worker execution, manual run typing, and command
mapping.
This commit is contained in:
2026-05-19 13:00:12 +02:00
parent aea22597ba
commit fe8e65e12e
9 changed files with 361 additions and 92 deletions

View File

@@ -125,7 +125,7 @@ This starts Django on:
Run the scheduler alongside the web admin: Run the scheduler alongside the web admin:
``` ```
docker compose up --build web scheduler docker compose up --build web scheduler worker
``` ```
The container persists `/opt/pobsync` and the SQLite database in Docker volumes. The container persists `/opt/pobsync` and the SQLite database in Docker volumes.
@@ -133,7 +133,7 @@ Backup data is mounted at `/backups` inside the containers. By default this uses
Override it with `POBSYNC_BACKUP_ROOT`: Override it with `POBSYNC_BACKUP_ROOT`:
``` ```
POBSYNC_BACKUP_ROOT=/mnt/backups/pobsync docker compose up --build web scheduler POBSYNC_BACKUP_ROOT=/mnt/backups/pobsync docker compose up --build web scheduler worker
``` ```
In the Django global config, set the backup root to `/backups` when running in Docker. For local, non-Docker use, In the Django global config, set the backup root to `/backups` when running in Docker. For local, non-Docker use,
@@ -148,7 +148,7 @@ docker compose --profile mariadb up --build web-mariadb
With the scheduler: With the scheduler:
``` ```
docker compose --profile mariadb up --build web-mariadb scheduler-mariadb docker compose --profile mariadb up --build web-mariadb scheduler-mariadb worker-mariadb
``` ```
SQLite remains the default because it is enough for a single backup server and keeps deployment simple. SQLite remains the default because it is enough for a single backup server and keeps deployment simple.

View File

@@ -29,6 +29,20 @@ services:
- pobsync_db:/var/lib/pobsync - pobsync_db:/var/lib/pobsync
- ${POBSYNC_BACKUP_ROOT:-./backups}:/backups - ${POBSYNC_BACKUP_ROOT:-./backups}:/backups
worker:
build: .
command: python manage.py run_pobsync_worker --loop --interval 15
environment:
POBSYNC_DJANGO_DEBUG: "1"
POBSYNC_DJANGO_SECRET_KEY: "dev-only-change-me"
POBSYNC_DJANGO_ALLOWED_HOSTS: "localhost,127.0.0.1,0.0.0.0"
POBSYNC_HOME: "/opt/pobsync"
POBSYNC_SQLITE_PATH: "/var/lib/pobsync/pobsync.sqlite3"
volumes:
- pobsync_state:/opt/pobsync
- pobsync_db:/var/lib/pobsync
- ${POBSYNC_BACKUP_ROOT:-./backups}:/backups
web-mariadb: web-mariadb:
profiles: ["mariadb"] profiles: ["mariadb"]
build: . build: .
@@ -73,6 +87,27 @@ services:
- pobsync_state:/opt/pobsync - pobsync_state:/opt/pobsync
- ${POBSYNC_BACKUP_ROOT:-./backups}:/backups - ${POBSYNC_BACKUP_ROOT:-./backups}:/backups
worker-mariadb:
profiles: ["mariadb"]
build: .
command: python manage.py run_pobsync_worker --loop --interval 15
environment:
POBSYNC_DJANGO_DEBUG: "1"
POBSYNC_DJANGO_SECRET_KEY: "dev-only-change-me"
POBSYNC_DJANGO_ALLOWED_HOSTS: "localhost,127.0.0.1,0.0.0.0"
POBSYNC_HOME: "/opt/pobsync"
POBSYNC_DB_ENGINE: "mariadb"
POBSYNC_DB_HOST: "db"
POBSYNC_DB_NAME: "pobsync"
POBSYNC_DB_USER: "pobsync"
POBSYNC_DB_PASSWORD: "pobsync"
depends_on:
db:
condition: service_healthy
volumes:
- pobsync_state:/opt/pobsync
- ${POBSYNC_BACKUP_ROOT:-./backups}:/backups
db: db:
profiles: ["mariadb"] profiles: ["mariadb"]
image: mariadb:11 image: mariadb:11

View File

@@ -15,6 +15,7 @@ COMMAND_ALIASES = {
"retention": "run_pobsync_retention", "retention": "run_pobsync_retention",
"discover-snapshots": "discover_pobsync_snapshots", "discover-snapshots": "discover_pobsync_snapshots",
"scheduler": "run_pobsync_scheduler", "scheduler": "run_pobsync_scheduler",
"worker": "run_pobsync_worker",
} }

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
from pathlib import Path
from django.db import transaction
from django.utils import timezone
from pobsync.commands.run_scheduled import run_scheduled
from pobsync_backend.config_source import DjangoConfigSource
from pobsync_backend.models import BackupRun, HostConfig
from pobsync_backend.retention import run_sql_retention_apply
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
def queue_backup_run(
*,
host: HostConfig,
run_type: str = BackupRun.RunType.MANUAL,
dry_run: bool = False,
prune: bool = False,
prune_max_delete: int = 10,
prune_protect_bases: bool = False,
) -> BackupRun:
return BackupRun.objects.create(
host=host,
run_type=run_type,
status=BackupRun.Status.QUEUED,
result={
"requested": {
"dry_run": bool(dry_run),
"prune": bool(prune),
"prune_max_delete": int(prune_max_delete),
"prune_protect_bases": bool(prune_protect_bases),
}
},
)
def execute_backup_run(
*,
run: BackupRun,
prefix: Path,
dry_run: bool = False,
prune: bool = False,
prune_max_delete: int = 10,
prune_protect_bases: bool = False,
) -> BackupRun:
run.status = BackupRun.Status.RUNNING
run.started_at = run.started_at or timezone.now()
run.save(update_fields=["status", "started_at"])
try:
result = run_scheduled(
prefix=prefix,
host=run.host.host,
dry_run=bool(dry_run),
prune=False,
config_source=DjangoConfigSource(),
)
except Exception as exc:
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.result = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.save(update_fields=["status", "ended_at", "result"])
raise
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.snapshot_path = str(result.get("snapshot") or "")
run.base_path = str(result.get("base") or "")
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
run.rsync_exit_code = rsync.get("exit_code")
run.result = result
snapshot_record = None
if run.snapshot_path:
snapshot_path = Path(run.snapshot_path)
try:
kind = infer_snapshot_kind(snapshot_path)
snapshot_record, _created = upsert_snapshot_record(host=run.host, kind=kind, snapshot_dir=snapshot_path)
except ValueError:
snapshot_record = None
if result.get("ok") and not result.get("dry_run") and prune:
try:
result["prune"] = run_sql_retention_apply(
prefix=prefix,
host=run.host.host,
kind="scheduled",
protect_bases=bool(prune_protect_bases),
yes=True,
max_delete=int(prune_max_delete),
acquire_lock=False,
)
except Exception as exc:
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.status = BackupRun.Status.FAILED
run.result = result
run.snapshot = snapshot_record
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
raise
run.snapshot = snapshot_record
run.result = result
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
return run
def claim_next_queued_run() -> BackupRun | None:
with transaction.atomic():
run = (
BackupRun.objects.select_related("host")
.filter(status=BackupRun.Status.QUEUED, host__enabled=True)
.order_by("created_at", "id")
.first()
)
if run is None:
return None
run.status = BackupRun.Status.RUNNING
run.started_at = timezone.now()
run.save(update_fields=["status", "started_at"])
return run
def requested_options(run: BackupRun) -> dict[str, object]:
requested = run.result.get("requested") if isinstance(run.result, dict) else None
if not isinstance(requested, dict):
return {}
return requested

View File

@@ -5,18 +5,14 @@ from typing import Any
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from pobsync.commands.run_scheduled import run_scheduled
from pobsync.paths import PobsyncPaths from pobsync.paths import PobsyncPaths
from pobsync_backend.config_source import DjangoConfigSource from pobsync_backend.backup_runner import execute_backup_run
from pobsync_backend.models import BackupRun, HostConfig from pobsync_backend.models import BackupRun, HostConfig
from pobsync_backend.retention import run_sql_retention_apply
from pobsync_backend.snapshot_discovery import infer_snapshot_kind, upsert_snapshot_record
class Command(BaseCommand): class Command(BaseCommand):
help = "Run a scheduled pobsync backup and record the result in Django." help = "Run a pobsync backup and record the result in Django."
def add_arguments(self, parser) -> None: def add_arguments(self, parser) -> None:
parser.add_argument("host", help="Host to back up") parser.add_argument("host", help="Host to back up")
@@ -25,6 +21,7 @@ class Command(BaseCommand):
parser.add_argument("--prune", action="store_true", help="Apply retention after a successful run") parser.add_argument("--prune", action="store_true", help="Apply retention after a successful run")
parser.add_argument("--prune-max-delete", type=int, default=10) parser.add_argument("--prune-max-delete", type=int, default=10)
parser.add_argument("--prune-protect-bases", action="store_true") parser.add_argument("--prune-protect-bases", action="store_true")
parser.add_argument("--manual", action="store_true", help="Record the run as manual instead of scheduled")
def handle(self, *args: Any, **options: Any) -> None: def handle(self, *args: Any, **options: Any) -> None:
host_name = options["host"] host_name = options["host"]
@@ -36,86 +33,20 @@ class Command(BaseCommand):
run = BackupRun.objects.create( run = BackupRun.objects.create(
host=host, host=host,
run_type=BackupRun.RunType.SCHEDULED, run_type=BackupRun.RunType.MANUAL if options["manual"] else BackupRun.RunType.SCHEDULED,
status=BackupRun.Status.RUNNING, status=BackupRun.Status.RUNNING,
started_at=timezone.now(),
) )
execute_backup_run(
try: run=run,
result = run_scheduled(
prefix=paths.home, prefix=paths.home,
host=host.host,
dry_run=bool(options["dry_run"]), dry_run=bool(options["dry_run"]),
prune=False, prune=bool(options["prune"]),
config_source=DjangoConfigSource(), prune_max_delete=int(options["prune_max_delete"]),
prune_protect_bases=bool(options["prune_protect_bases"]),
) )
except Exception as exc: run.refresh_from_db()
run.status = BackupRun.Status.FAILED
run.ended_at = timezone.now()
run.result = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.save(update_fields=["status", "ended_at", "result"])
raise
run.status = BackupRun.Status.SUCCESS if result.get("ok") else BackupRun.Status.FAILED if run.status == BackupRun.Status.SUCCESS:
run.ended_at = timezone.now()
run.snapshot_path = str(result.get("snapshot") or "")
run.base_path = str(result.get("base") or "")
rsync = result.get("rsync") if isinstance(result.get("rsync"), dict) else {}
run.rsync_exit_code = rsync.get("exit_code")
run.result = result
snapshot_record = None
if run.snapshot_path:
snapshot_path = Path(run.snapshot_path)
try:
kind = infer_snapshot_kind(snapshot_path)
snapshot_record, _created = upsert_snapshot_record(host=host, kind=kind, snapshot_dir=snapshot_path)
except ValueError:
snapshot_record = None
if result.get("ok") and not result.get("dry_run") and options["prune"]:
try:
result["prune"] = run_sql_retention_apply(
prefix=paths.home,
host=host.host,
kind="scheduled",
protect_bases=bool(options["prune_protect_bases"]),
yes=True,
max_delete=int(options["prune_max_delete"]),
acquire_lock=False,
)
except Exception as exc:
result["prune"] = {"ok": False, "error": str(exc), "type": type(exc).__name__}
run.status = BackupRun.Status.FAILED
run.result = result
run.snapshot = snapshot_record
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
raise
run.snapshot = snapshot_record
run.result = result
run.save(
update_fields=[
"status",
"ended_at",
"snapshot_path",
"snapshot",
"base_path",
"rsync_exit_code",
"result",
],
)
if result.get("ok"):
self.stdout.write(self.style.SUCCESS(f"Backup completed for {host.host}.")) self.stdout.write(self.style.SUCCESS(f"Backup completed for {host.host}."))
return return

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import time
from pathlib import Path
from typing import Any
from django.conf import settings
from django.core.management.base import BaseCommand
from pobsync.paths import PobsyncPaths
from pobsync_backend.backup_runner import claim_next_queued_run, execute_backup_run, requested_options
class Command(BaseCommand):
help = "Run queued pobsync backup jobs from the Django database."
def add_arguments(self, parser) -> None:
parser.add_argument("--prefix", default=settings.POBSYNC_HOME, help="Pobsync home directory")
parser.add_argument("--once", action="store_true", help="Process one queued run and exit")
parser.add_argument("--loop", action="store_true", help="Keep checking for queued runs")
parser.add_argument("--interval", type=int, default=15, help="Loop interval in seconds")
def handle(self, *args: Any, **options: Any) -> None:
if not options["once"] and not options["loop"]:
options["once"] = True
paths = PobsyncPaths(home=Path(options["prefix"]))
while True:
count = self._run_once(prefix=paths.home)
self.stdout.write(f"Ran {count} queued backup run(s).")
if options["once"]:
return
time.sleep(max(1, int(options["interval"])))
def _run_once(self, *, prefix: Path) -> int:
run = claim_next_queued_run()
if run is None:
return 0
options = requested_options(run)
try:
execute_backup_run(
run=run,
prefix=prefix,
dry_run=bool(options.get("dry_run", False)),
prune=bool(options.get("prune", False)),
prune_max_delete=int(options.get("prune_max_delete", 10)),
prune_protect_bases=bool(options.get("prune_protect_bases", False)),
)
except Exception as exc:
self.stderr.write(f"{run.host.host}: {type(exc).__name__}: {exc}")
return 1

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
from django.test import TestCase
from pobsync.util import write_yaml_atomic
from pobsync_backend.backup_runner import queue_backup_run
from pobsync_backend.management.commands.run_pobsync_worker import Command
from pobsync_backend.models import BackupRun, GlobalConfig, HostConfig, SnapshotRecord
class BackupWorkerTests(TestCase):
def test_queue_backup_run_records_requested_options(self) -> None:
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
run = queue_backup_run(
host=host,
dry_run=True,
prune=True,
prune_max_delete=3,
prune_protect_bases=True,
)
self.assertEqual(run.status, BackupRun.Status.QUEUED)
self.assertEqual(run.run_type, BackupRun.RunType.MANUAL)
self.assertEqual(
run.result["requested"],
{
"dry_run": True,
"prune": True,
"prune_max_delete": 3,
"prune_protect_bases": True,
},
)
def test_worker_executes_next_queued_run(self) -> None:
with TemporaryDirectory() as tmp:
backup_root = Path(tmp) / "backups"
GlobalConfig.objects.create(name="default", backup_root=str(backup_root))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
snapshot_dir = backup_root / host.host / "scheduled" / "20260519-021500Z__ABCDEFGH"
meta_dir = snapshot_dir / "meta"
meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
run = queue_backup_run(host=host)
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
run_scheduled.return_value = {
"ok": True,
"dry_run": False,
"host": host.host,
"snapshot": str(snapshot_dir),
"base": None,
"rsync": {"exit_code": 0},
}
count = Command()._run_once(prefix=Path(tmp) / "home")
self.assertEqual(count, 1)
run.refresh_from_db()
self.assertEqual(run.status, BackupRun.Status.SUCCESS)
self.assertEqual(SnapshotRecord.objects.count(), 1)
self.assertEqual(run.snapshot, SnapshotRecord.objects.get())
def test_worker_returns_zero_without_queued_runs(self) -> None:
count = Command()._run_once(prefix=Path("/opt/pobsync"))
self.assertEqual(count, 0)

View File

@@ -46,3 +46,10 @@ class ConsoleEntrypointTests(SimpleTestCase):
self.assertEqual(exit_code, 0) self.assertEqual(exit_code, 0)
execute.assert_called_once_with(["pobsync", "discover_pobsync_snapshots", "--host", "web-01"]) execute.assert_called_once_with(["pobsync", "discover_pobsync_snapshots", "--host", "web-01"])
def test_maps_worker_alias_to_django_command(self) -> None:
with patch("pobsync.cli.execute_from_command_line") as execute:
exit_code = main(["worker", "--once"])
self.assertEqual(exit_code, 0)
execute.assert_called_once_with(["pobsync", "run_pobsync_worker", "--once"])

View File

@@ -32,7 +32,7 @@ class RunBackupRecordsSnapshotTests(TestCase):
}, },
) )
with patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled: with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
run_scheduled.return_value = { run_scheduled.return_value = {
"ok": True, "ok": True,
"dry_run": False, "dry_run": False,
@@ -63,9 +63,9 @@ class RunBackupRecordsSnapshotTests(TestCase):
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
with ( with (
patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled, patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled,
patch( patch(
"pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply" "pobsync_backend.backup_runner.run_sql_retention_apply"
) as retention_apply, ) as retention_apply,
): ):
run_scheduled.return_value = { run_scheduled.return_value = {
@@ -113,9 +113,9 @@ class RunBackupRecordsSnapshotTests(TestCase):
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"}) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "success", "started_at": "2026-05-19T02:15:00Z"})
with ( with (
patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled, patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled,
patch( patch(
"pobsync_backend.management.commands.run_pobsync_backup.run_sql_retention_apply" "pobsync_backend.backup_runner.run_sql_retention_apply"
) as retention_apply, ) as retention_apply,
): ):
run_scheduled.return_value = { run_scheduled.return_value = {
@@ -155,7 +155,7 @@ class RunBackupRecordsSnapshotTests(TestCase):
meta_dir.mkdir(parents=True) meta_dir.mkdir(parents=True)
write_yaml_atomic(meta_dir / "meta.yaml", {"status": "failed", "started_at": "2026-05-19T02:15:00Z"}) write_yaml_atomic(meta_dir / "meta.yaml", {"status": "failed", "started_at": "2026-05-19T02:15:00Z"})
with patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled: with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
run_scheduled.return_value = { run_scheduled.return_value = {
"ok": False, "ok": False,
"dry_run": False, "dry_run": False,
@@ -179,7 +179,7 @@ class RunBackupRecordsSnapshotTests(TestCase):
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups")) GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test") host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with patch("pobsync_backend.management.commands.run_pobsync_backup.run_scheduled") as run_scheduled: with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
run_scheduled.return_value = { run_scheduled.return_value = {
"ok": True, "ok": True,
"dry_run": True, "dry_run": True,
@@ -198,3 +198,28 @@ class RunBackupRecordsSnapshotTests(TestCase):
self.assertEqual(BackupRun.objects.count(), 1) self.assertEqual(BackupRun.objects.count(), 1)
self.assertIsNone(BackupRun.objects.get().snapshot) self.assertIsNone(BackupRun.objects.get().snapshot)
self.assertEqual(SnapshotRecord.objects.count(), 0) self.assertEqual(SnapshotRecord.objects.count(), 0)
def test_manual_flag_records_manual_run_type(self) -> None:
with TemporaryDirectory() as tmp:
GlobalConfig.objects.create(name="default", backup_root=str(Path(tmp) / "backups"))
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with patch("pobsync_backend.backup_runner.run_scheduled") as run_scheduled:
run_scheduled.return_value = {
"ok": True,
"dry_run": True,
"host": host.host,
"base": None,
"rsync": {"exit_code": 0},
}
call_command(
"run_pobsync_backup",
host.host,
prefix=str(Path(tmp) / "home"),
dry_run=True,
manual=True,
stdout=StringIO(),
)
run = BackupRun.objects.get()
self.assertEqual(run.run_type, BackupRun.RunType.MANUAL)