diff --git a/README.md b/README.md index 1c3eeef..0ff8f1a 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ The installer will, by default: - create `/var/lib/pobsync`, `/var/log/pobsync`, and the backup root - install Python dependencies - run migrations and collect static files +- generate a default SSH key for the service user if one does not exist yet - install and start `pobsync-web`, `pobsync-worker`, and `pobsync-scheduler` - guide you through the first login and setup steps @@ -141,16 +142,21 @@ The UI includes: ## SSH Keys -SSH keys can be managed from `/ssh-credentials/`. Add a private key, optionally paste `known_hosts` entries, and select -the credential either as the global default or as a per-host override. +SSH keys can be managed from `/ssh-credentials/`. The recommended flow is to generate keys from Django or during the +installer. pobsync stores the private key on disk under `POBSYNC_HOME`, keeps the public key visible in the UI, and lets +you select a credential either as the global default or as a per-host override. -When a backup starts, the worker writes the selected key to: +Generated private keys are stored at: ``` $POBSYNC_HOME/state/ssh-credentials//identity ``` -The key file is written with `0600` permissions and injected into the rsync SSH command with `IdentityFile`. +The key file is written with `0600` permissions and injected into the rsync SSH command with `IdentityFile`. Copy the +public key shown in Django to the target host's `authorized_keys`. + +Existing private keys can still be added manually, but generated filesystem keys are preferred for native systemd +production installs. ## Updates diff --git a/scripts/install-systemd b/scripts/install-systemd index 422dc8e..e16fd2c 100755 --- a/scripts/install-systemd +++ b/scripts/install-systemd @@ -461,6 +461,7 @@ run_step "Install systemd units" install_units run_step "Reload systemd" systemctl daemon-reload run_step "Run database migrations" "$VENV_DIR/bin/python" "$APP_DIR/manage.py" migrate --noinput +run_step "Ensure default SSH key" "$VENV_DIR/bin/python" "$APP_DIR/manage.py" ensure_pobsync_ssh_key --name default --set-global-default run_step "Collect static files" "$VENV_DIR/bin/python" "$APP_DIR/manage.py" collectstatic --noinput --clear run_step "Finalize state permissions" chown -R "$SERVICE_USER:$SERVICE_GROUP" /var/lib/pobsync /var/log/pobsync diff --git a/src/pobsync_backend/admin.py b/src/pobsync_backend/admin.py index 5798df7..d0afe15 100644 --- a/src/pobsync_backend/admin.py +++ b/src/pobsync_backend/admin.py @@ -11,11 +11,12 @@ from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, Snapsho @admin.register(SshCredential) class SshCredentialAdmin(admin.ModelAdmin): - list_display = ("name", "has_public_key", "has_known_hosts", "updated_at") - readonly_fields = ("created_at", "updated_at") + list_display = ("name", "key_type", "generated", "has_public_key", "has_known_hosts", "updated_at") + readonly_fields = ("created_at", "updated_at", "fingerprint") search_fields = ("name", "notes") fieldsets = ( - (None, {"fields": ("name", "private_key", "public_key", "known_hosts", "notes")}), + (None, {"fields": ("name", "key_type", "generated", "key_path", "fingerprint")}), + ("Key material", {"fields": ("private_key", "public_key", "known_hosts", "notes")}), ("Timestamps", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}), ) diff --git a/src/pobsync_backend/config_source.py b/src/pobsync_backend/config_source.py index c916699..d1bbd68 100644 --- a/src/pobsync_backend/config_source.py +++ b/src/pobsync_backend/config_source.py @@ -11,6 +11,7 @@ from pobsync.paths import PobsyncPaths from .config_repository import global_config_data, host_config_data from .models import GlobalConfig, HostConfig, SshCredential +from .ssh_keys import identity_path class DjangoConfigSource: @@ -48,9 +49,12 @@ def _materialize_credential(credential: SshCredential) -> dict[str, str]: credential_dir.mkdir(mode=0o700, parents=True, exist_ok=True) os.chmod(credential_dir, 0o700) - identity_file = credential_dir / "identity" - identity_file.write_text(_with_trailing_newline(credential.private_key), encoding="utf-8") - os.chmod(identity_file, 0o600) + identity_file = identity_path(credential) + if credential.key_path: + os.chmod(identity_file, 0o600) + else: + identity_file.write_text(_with_trailing_newline(credential.private_key), encoding="utf-8") + os.chmod(identity_file, 0o600) result = {"identity_file": str(identity_file)} if credential.known_hosts.strip(): diff --git a/src/pobsync_backend/forms.py b/src/pobsync_backend/forms.py index 990c615..9a4b3f8 100644 --- a/src/pobsync_backend/forms.py +++ b/src/pobsync_backend/forms.py @@ -186,7 +186,9 @@ class SshCredentialForm(forms.ModelForm): raw_private_key = self.cleaned_data.get("private_key", "") if not raw_private_key.strip(): - raise forms.ValidationError("Paste a private key or upload a private key file.") + if self.instance and self.instance.pk and self.instance.key_path: + return self.instance.private_key + raise forms.ValidationError("Paste a private key, upload a private key file, or generate a key from Django.") private_key = normalize_private_key(raw_private_key) public_key = validate_ssh_private_key(private_key) @@ -198,6 +200,8 @@ class SshCredentialForm(forms.ModelForm): provided_public_key = normalize_public_key(cleaned_data.get("public_key", "")) if provided_public_key: cleaned_data["public_key"] = provided_public_key + elif self.instance and self.instance.pk and self.instance.key_path: + cleaned_data["public_key"] = self.instance.public_key if cleaned_data.get("private_key") and provided_public_key and hasattr(self, "derived_public_key"): if public_key_identity(provided_public_key) != public_key_identity(self.derived_public_key): @@ -210,6 +214,32 @@ class SshCredentialForm(forms.ModelForm): return cleaned_data +class SshCredentialGenerateForm(forms.Form): + name = forms.CharField(max_length=128) + key_type = forms.ChoiceField( + choices=(("ed25519", "ed25519"), ("rsa", "rsa")), + initial="ed25519", + help_text="ed25519 is recommended unless you need RSA for an older target.", + ) + set_global_default = forms.BooleanField( + required=False, + initial=True, + help_text="Use this key as the global default when the default global config exists.", + ) + known_hosts = forms.CharField( + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), + required=False, + help_text="Optional known_hosts entries. This can also be filled later.", + ) + notes = forms.CharField(widget=forms.Textarea, required=False) + + def clean_name(self) -> str: + name = self.cleaned_data["name"].strip() + if SshCredential.objects.filter(name=name).exists(): + raise forms.ValidationError("An SSH credential with this name already exists.") + return name + + class RetentionApplyForm(forms.Form): kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All"))) protect_bases = forms.BooleanField(required=False) diff --git a/src/pobsync_backend/host_ops.py b/src/pobsync_backend/host_ops.py index d290bd8..e93e5f4 100644 --- a/src/pobsync_backend/host_ops.py +++ b/src/pobsync_backend/host_ops.py @@ -7,6 +7,7 @@ from pobsync.snapshot_meta import resolve_host_root from .models import GlobalConfig, HostConfig from .self_check import SelfCheck +from .ssh_keys import identity_path HOST_BACKUP_SUBDIRS = ("scheduled", "manual", ".incomplete") @@ -43,13 +44,15 @@ def collect_host_checks(host: HostConfig, global_config: GlobalConfig | None = N ) credential = host.ssh_credential or global_config.default_ssh_credential - checks.append( - SelfCheck( - "Host SSH credential", - "ok" if credential else "warning", - str(credential) if credential else "No host or global SSH credential selected.", - ) - ) + if credential is None: + checks.append(SelfCheck("Host SSH credential", "warning", "No host or global SSH credential selected.")) + else: + checks.append(SelfCheck("Host SSH credential", "ok", str(credential))) + if credential.key_path: + key_path = identity_path(credential) + checks.append( + _host_path_check("Host SSH key file", key_path, must_exist=True, must_be_writable=False, must_be_readable=True) + ) host_root = resolve_host_root(global_config.backup_root, host.host) checks.append(_host_path_check("Host backup root", host_root, must_exist=True, must_be_writable=True)) @@ -58,7 +61,14 @@ def collect_host_checks(host: HostConfig, global_config: GlobalConfig | None = N return checks -def _host_path_check(name: str, path: Path, *, must_exist: bool, must_be_writable: bool) -> SelfCheck: +def _host_path_check( + name: str, + path: Path, + *, + must_exist: bool, + must_be_writable: bool, + must_be_readable: bool = False, +) -> SelfCheck: if must_exist and not path.exists(): return SelfCheck(name, "failed", f"{path} does not exist.") target = path if path.exists() else path.parent @@ -66,4 +76,6 @@ def _host_path_check(name: str, path: Path, *, must_exist: bool, must_be_writabl return SelfCheck(name, "failed", f"{target} does not exist.") if must_be_writable and not os.access(target, os.W_OK): return SelfCheck(name, "failed", f"{target} is not writable by this process.") + if must_be_readable and not os.access(target, os.R_OK): + return SelfCheck(name, "failed", f"{target} is not readable by this process.") return SelfCheck(name, "ok", str(path)) diff --git a/src/pobsync_backend/management/commands/ensure_pobsync_ssh_key.py b/src/pobsync_backend/management/commands/ensure_pobsync_ssh_key.py new file mode 100644 index 0000000..083f999 --- /dev/null +++ b/src/pobsync_backend/management/commands/ensure_pobsync_ssh_key.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from django.core.management.base import BaseCommand, CommandError + +from pobsync_backend.models import GlobalConfig, SshCredential +from pobsync_backend.ssh_keys import SshKeyError, generate_ssh_key + + +class Command(BaseCommand): + help = "Ensure a filesystem-backed SSH key exists for pobsync backups." + + def add_arguments(self, parser): + parser.add_argument("--name", default="default", help="Credential name to create or reuse.") + parser.add_argument("--key-type", default="ed25519", choices=("ed25519", "rsa")) + parser.add_argument( + "--set-global-default", + action="store_true", + help="Set this key as default on the default global config when it exists.", + ) + + def handle(self, *args, **options): + name = options["name"] + credential, created = SshCredential.objects.get_or_create( + name=name, + defaults={ + "key_type": options["key_type"], + "notes": "Generated by pobsync installer.", + }, + ) + + if not credential.key_path and not credential.private_key: + try: + generate_ssh_key(credential, key_type=options["key_type"]) + except SshKeyError as exc: + raise CommandError(str(exc)) from exc + created = True + + if options["set_global_default"]: + global_config = GlobalConfig.objects.filter(name="default").first() + if global_config is not None and global_config.default_ssh_credential_id is None: + global_config.default_ssh_credential = credential + global_config.save(update_fields=["default_ssh_credential", "updated_at"]) + + action = "created" if created else "exists" + self.stdout.write(self.style.SUCCESS(f"SSH credential {action}: {credential.name}")) + if credential.public_key: + self.stdout.write(credential.public_key) diff --git a/src/pobsync_backend/migrations/0007_filesystem_ssh_credentials.py b/src/pobsync_backend/migrations/0007_filesystem_ssh_credentials.py new file mode 100644 index 0000000..aea71d8 --- /dev/null +++ b/src/pobsync_backend/migrations/0007_filesystem_ssh_credentials.py @@ -0,0 +1,35 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("pobsync_backend", "0006_ssh_credentials"), + ] + + operations = [ + migrations.AlterField( + model_name="sshcredential", + name="private_key", + field=models.TextField(blank=True, default=""), + ), + migrations.AddField( + model_name="sshcredential", + name="key_path", + field=models.CharField(blank=True, max_length=1024), + ), + migrations.AddField( + model_name="sshcredential", + name="key_type", + field=models.CharField(default="ed25519", max_length=32), + ), + migrations.AddField( + model_name="sshcredential", + name="fingerprint", + field=models.CharField(blank=True, max_length=255), + ), + migrations.AddField( + model_name="sshcredential", + name="generated", + field=models.BooleanField(default=False), + ), + ] diff --git a/src/pobsync_backend/models.py b/src/pobsync_backend/models.py index d2f1a8f..b4fbb5f 100644 --- a/src/pobsync_backend/models.py +++ b/src/pobsync_backend/models.py @@ -80,8 +80,12 @@ class HostConfig(TimestampedModel): class SshCredential(TimestampedModel): name = models.CharField(max_length=128, unique=True) - private_key = models.TextField() + private_key = models.TextField(blank=True, default="") public_key = models.TextField(blank=True) + key_path = models.CharField(max_length=1024, blank=True) + key_type = models.CharField(max_length=32, default="ed25519") + fingerprint = models.CharField(max_length=255, blank=True) + generated = models.BooleanField(default=False) known_hosts = models.TextField(blank=True) notes = models.TextField(blank=True) diff --git a/src/pobsync_backend/ssh_keys.py b/src/pobsync_backend/ssh_keys.py new file mode 100644 index 0000000..8aa50d2 --- /dev/null +++ b/src/pobsync_backend/ssh_keys.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import os +import shutil +import subprocess +from pathlib import Path + +from django.conf import settings + +from .models import SshCredential + + +class SshKeyError(RuntimeError): + pass + + +def credential_dir(credential: SshCredential) -> Path: + return Path(settings.POBSYNC_HOME) / "state" / "ssh-credentials" / str(credential.pk) + + +def identity_path(credential: SshCredential) -> Path: + if credential.key_path: + return Path(credential.key_path) + return credential_dir(credential) / "identity" + + +def generate_ssh_key(credential: SshCredential, *, key_type: str = "ed25519", force: bool = False) -> SshCredential: + if credential.pk is None: + raise SshKeyError("Credential must be saved before generating an SSH key.") + if shutil.which("ssh-keygen") is None: + raise SshKeyError("ssh-keygen is not available.") + + key_dir = credential_dir(credential) + key_dir.mkdir(mode=0o700, parents=True, exist_ok=True) + os.chmod(key_dir, 0o700) + + private_key = key_dir / "identity" + public_key_file = key_dir / "identity.pub" + if force: + private_key.unlink(missing_ok=True) + public_key_file.unlink(missing_ok=True) + elif private_key.exists() or public_key_file.exists(): + raise SshKeyError(f"SSH key already exists for {credential.name}.") + + result = subprocess.run( + [ + "ssh-keygen", + "-t", + key_type, + "-N", + "", + "-C", + f"pobsync:{credential.name}", + "-f", + str(private_key), + ], + check=False, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=15, + ) + if result.returncode != 0: + raise SshKeyError(result.stderr.strip() or "ssh-keygen failed.") + + os.chmod(private_key, 0o600) + public_key = public_key_file.read_text(encoding="utf-8").strip() + fingerprint = fingerprint_for_key(private_key) + + credential.private_key = "" + credential.public_key = public_key + credential.key_path = str(private_key) + credential.key_type = key_type + credential.fingerprint = fingerprint + credential.generated = True + credential.save(update_fields=["private_key", "public_key", "key_path", "key_type", "fingerprint", "generated", "updated_at"]) + return credential + + +def fingerprint_for_key(private_key: Path) -> str: + result = subprocess.run( + ["ssh-keygen", "-lf", str(private_key)], + check=False, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=5, + ) + if result.returncode != 0: + raise SshKeyError(result.stderr.strip() or "Could not fingerprint SSH key.") + return result.stdout.strip() + + +def delete_generated_key_files(credential: SshCredential) -> None: + path = identity_path(credential) + allowed_root = (Path(settings.POBSYNC_HOME) / "state" / "ssh-credentials").resolve() + try: + resolved = path.resolve() + except FileNotFoundError: + resolved = path + + if allowed_root not in resolved.parents: + raise SshKeyError(f"Refusing to delete key outside {allowed_root}.") + + path.unlink(missing_ok=True) + path.with_suffix(path.suffix + ".pub").unlink(missing_ok=True) + if path.name == "identity": + (path.parent / "identity.pub").unlink(missing_ok=True) diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html index bf55937..ee061bb 100644 --- a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html +++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html @@ -11,6 +11,18 @@

{% if credential %}Edit SSH Credential{% else %}Create SSH Credential{% endif %}

+ {% if credential and credential.public_key %} +
+ +
{{ credential.public_key }}
+
+ {% endif %} + {% if credential and credential.key_path %} +

Private key path: {{ credential.key_path }}

+ {% endif %} + {% if credential and credential.fingerprint %} +

Fingerprint: {{ credential.fingerprint }}

+ {% endif %}
{% csrf_token %} {{ form.non_field_errors }} @@ -29,4 +41,14 @@
+ + {% if credential %} +
+

Delete SSH Key

+
+ {% csrf_token %} + +
+
+ {% endif %} {% endblock %} diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_generate.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_generate.html new file mode 100644 index 0000000..7d8b593 --- /dev/null +++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_generate.html @@ -0,0 +1,32 @@ +{% extends "pobsync_backend/base.html" %} + +{% block title %}Generate SSH Key | pobsync{% endblock %} + +{% block content %} +

Generate SSH Key

+ +
+ Back to SSH keys +
+ +
+

Create Key Pair

+
+ {% csrf_token %} + {{ form.non_field_errors }} + + {% for field in form %} +
+ {{ field.errors }} + + {{ field }} + {% if field.help_text %}
{{ field.help_text }}
{% endif %} +
+ {% endfor %} + +
+ +
+
+
+{% endblock %} diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html index ff4c1a3..71ec49e 100644 --- a/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html +++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html @@ -6,7 +6,8 @@

SSH Keys

- New SSH key + Generate SSH key + Add existing key Back to dashboard
@@ -16,7 +17,9 @@ Name + Type Public key + Fingerprint Known hosts Hosts Updated @@ -26,13 +29,15 @@ {% for credential in credentials %} {{ credential.name }} - {{ credential.public_key|yesno:"yes,no" }} + {{ credential.key_type }} + {% if credential.public_key %}{{ credential.public_key|truncatechars:44 }}{% else %}no{% endif %} + {% if credential.fingerprint %}{{ credential.fingerprint }}{% else %}unknown{% endif %} {{ credential.known_hosts|yesno:"yes,no" }} {{ credential.hosts.count }} {{ credential.updated_at }} {% empty %} - No SSH credentials configured yet. + No SSH credentials configured yet. {% endfor %} diff --git a/src/pobsync_backend/tests/test_django_config_source.py b/src/pobsync_backend/tests/test_django_config_source.py index 0f3d822..1fe0887 100644 --- a/src/pobsync_backend/tests/test_django_config_source.py +++ b/src/pobsync_backend/tests/test_django_config_source.py @@ -115,3 +115,21 @@ class DjangoConfigSourceTests(TestCase): self.assertFalse(global_identity_file.exists()) self.assertIn(f"-oIdentityFile={host_identity_file}", cfg["ssh"]["options"]) + + def test_filesystem_ssh_credential_uses_existing_key_path(self) -> None: + with TemporaryDirectory() as tmp: + identity_file = Path(tmp) / "identity" + identity_file.write_text("PRIVATE KEY\n", encoding="utf-8") + identity_file.chmod(0o600) + credential = SshCredential.objects.create(name="backup-key", key_path=str(identity_file), public_key="PUBLIC") + GlobalConfig.objects.create( + name="default", + backup_root="/backups", + pobsync_home="/opt/pobsync", + default_ssh_credential=credential, + ) + HostConfig.objects.create(host="web-01", address="web-01.example.test") + + cfg = DjangoConfigSource().effective_config_for_host("web-01") + + self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"]) diff --git a/src/pobsync_backend/tests/test_ssh_credentials.py b/src/pobsync_backend/tests/test_ssh_credentials.py index 760a4c3..a1fd728 100644 --- a/src/pobsync_backend/tests/test_ssh_credentials.py +++ b/src/pobsync_backend/tests/test_ssh_credentials.py @@ -5,9 +5,11 @@ from pathlib import Path from tempfile import TemporaryDirectory from django import forms -from django.test import SimpleTestCase +from django.core.management import call_command +from django.test import SimpleTestCase, TestCase, override_settings from pobsync_backend.forms import normalize_private_key, validate_ssh_private_key +from pobsync_backend.models import GlobalConfig, SshCredential class SshCredentialValidationTests(SimpleTestCase): @@ -38,3 +40,23 @@ class SshCredentialValidationTests(SimpleTestCase): validate_ssh_private_key("-----BEGIN RSA PRIVATE KEY-----\nabc\n-----END RSA PRIVATE KEY-----") self.assertIn("PEM private keys are not supported", str(exc.exception)) + + +class SshCredentialManagementTests(TestCase): + def test_ensure_ssh_key_command_generates_default_key(self) -> None: + with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): + call_command("ensure_pobsync_ssh_key", "--name", "default") + + credential = SshCredential.objects.get(name="default") + self.assertTrue(credential.generated) + self.assertTrue(Path(credential.key_path).exists()) + self.assertTrue(credential.public_key.startswith("ssh-ed25519 ")) + + def test_ensure_ssh_key_command_sets_global_default_when_available(self) -> None: + global_config = GlobalConfig.objects.create(name="default", backup_root="/backups") + + with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): + call_command("ensure_pobsync_ssh_key", "--name", "default", "--set-global-default") + + global_config.refresh_from_db() + self.assertEqual(global_config.default_ssh_credential.name, "default") diff --git a/src/pobsync_backend/tests/test_views.py b/src/pobsync_backend/tests/test_views.py index 5f8e6aa..ad7a41c 100644 --- a/src/pobsync_backend/tests/test_views.py +++ b/src/pobsync_backend/tests/test_views.py @@ -163,6 +163,45 @@ class ViewTests(TestCase): self.assertEqual(credential.private_key, "UPLOADED PRIVATE KEY\n") self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") + def test_ssh_credentials_view_generates_filesystem_key(self) -> None: + self.client.force_login(self.staff_user) + with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): + response = self.client.post( + reverse("generate_ssh_credential"), + { + "name": "generated-key", + "key_type": "ed25519", + "set_global_default": "", + "known_hosts": "", + "notes": "generated", + }, + follow=True, + ) + + self.assertRedirects(response, reverse("ssh_credentials")) + self.assertContains(response, "SSH key generated for generated-key.") + credential = SshCredential.objects.get(name="generated-key") + self.assertTrue(credential.generated) + self.assertEqual(credential.private_key, "") + self.assertTrue(credential.public_key.startswith("ssh-ed25519 ")) + self.assertTrue(Path(credential.key_path).exists()) + + def test_ssh_credentials_view_deletes_unused_generated_key(self) -> None: + self.client.force_login(self.staff_user) + with TemporaryDirectory() as tmp, override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): + credential = SshCredential.objects.create(name="generated-key") + from pobsync_backend.ssh_keys import generate_ssh_key + + generate_ssh_key(credential) + key_path = Path(credential.key_path) + + response = self.client.post(reverse("delete_ssh_credential", args=[credential.id]), follow=True) + + self.assertRedirects(response, reverse("ssh_credentials")) + self.assertContains(response, "SSH key deleted: generated-key.") + self.assertFalse(SshCredential.objects.exists()) + self.assertFalse(key_path.exists()) + def test_ssh_credentials_view_rejects_invalid_key(self) -> None: self.client.force_login(self.staff_user) @@ -284,6 +323,15 @@ class ViewTests(TestCase): self.assertEqual(config.retention_daily, 7) self.assertEqual(config.retention_yearly, 1) + def test_global_config_form_defaults_to_first_generated_key(self) -> None: + self.client.force_login(self.staff_user) + credential = SshCredential.objects.create(name="default", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity") + + response = self.client.get(reverse("edit_global_config")) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, f'value="{credential.id}" selected') + def test_global_config_form_renders_static_container_backup_root_on_edit(self) -> None: self.client.force_login(self.staff_user) GlobalConfig.objects.create( diff --git a/src/pobsync_backend/views.py b/src/pobsync_backend/views.py index 22a6b40..505e302 100644 --- a/src/pobsync_backend/views.py +++ b/src/pobsync_backend/views.py @@ -21,6 +21,7 @@ from .forms import ( HostConfigForm, ManualBackupForm, RetentionApplyForm, + SshCredentialGenerateForm, ScheduleConfigForm, SshCredentialForm, ) @@ -29,6 +30,7 @@ from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, Snapsho from .retention import run_sql_retention_apply, run_sql_retention_plan from .self_check import collect_self_checks, summarize_self_checks from .snapshot_discovery import discover_snapshots, inspect_snapshot_discovery +from .ssh_keys import SshKeyError, delete_generated_key_files, generate_ssh_key @staff_member_required @@ -110,6 +112,42 @@ def create_ssh_credential(request): ) +@staff_member_required +def generate_ssh_credential(request): + if request.method == "POST": + form = SshCredentialGenerateForm(request.POST) + if form.is_valid(): + credential = SshCredential.objects.create( + name=form.cleaned_data["name"], + key_type=form.cleaned_data["key_type"], + known_hosts=form.cleaned_data["known_hosts"], + notes=form.cleaned_data["notes"], + ) + try: + credential = generate_ssh_key(credential, key_type=form.cleaned_data["key_type"]) + except SshKeyError as exc: + credential.delete() + form.add_error(None, str(exc)) + else: + if form.cleaned_data["set_global_default"]: + global_config = GlobalConfig.objects.filter(name="default").first() + if global_config is not None: + global_config.default_ssh_credential = credential + global_config.save(update_fields=["default_ssh_credential", "updated_at"]) + messages.success(request, f"SSH key generated for {credential.name}.") + return redirect("ssh_credentials") + else: + form = SshCredentialGenerateForm() + + return render( + request, + "pobsync_backend/ssh_credential_generate.html", + { + "form": form, + }, + ) + + @staff_member_required def edit_ssh_credential(request, credential_id: int): credential = get_object_or_404(SshCredential, id=credential_id) @@ -132,6 +170,27 @@ def edit_ssh_credential(request, credential_id: int): ) +@staff_member_required +@require_POST +def delete_ssh_credential(request, credential_id: int): + credential = get_object_or_404(SshCredential, id=credential_id) + if credential.hosts.exists() or credential.global_configs.exists(): + messages.error(request, f"SSH key {credential.name} is still in use and cannot be deleted.") + return redirect("edit_ssh_credential", credential_id=credential.id) + + name = credential.name + try: + if credential.generated or credential.key_path: + delete_generated_key_files(credential) + except SshKeyError as exc: + messages.error(request, f"Could not delete SSH key files for {name}: {exc}") + return redirect("edit_ssh_credential", credential_id=credential.id) + + credential.delete() + messages.success(request, f"SSH key deleted: {name}.") + return redirect("ssh_credentials") + + @staff_member_required def edit_global_config(request): global_config = GlobalConfig.objects.filter(name="default").first() @@ -434,6 +493,7 @@ def _default_schedule_initial() -> dict[str, object]: def _default_global_initial() -> dict[str, object]: return { "name": "default", + "default_ssh_credential": SshCredential.objects.order_by("name").first(), "ssh_user": "root", "ssh_port": 22, "rsync_binary": "rsync", diff --git a/src/pobsync_server/urls.py b/src/pobsync_server/urls.py index 7ea2a71..4cfea68 100644 --- a/src/pobsync_server/urls.py +++ b/src/pobsync_server/urls.py @@ -13,7 +13,9 @@ urlpatterns = [ path("config/global/", views.edit_global_config, name="edit_global_config"), path("ssh-credentials/", views.ssh_credentials, name="ssh_credentials"), path("ssh-credentials/new/", views.create_ssh_credential, name="create_ssh_credential"), + path("ssh-credentials/generate/", views.generate_ssh_credential, name="generate_ssh_credential"), path("ssh-credentials//", views.edit_ssh_credential, name="edit_ssh_credential"), + path("ssh-credentials//delete/", views.delete_ssh_credential, name="delete_ssh_credential"), path("hosts/new/", views.create_host_config, name="create_host_config"), path("hosts//", views.host_detail, name="host_detail"), path("hosts//config/", views.edit_host_config, name="edit_host_config"),