From c018011e83e39d89126a3ff5639d8b38d9e5360a Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Tue, 19 May 2026 15:22:40 +0200 Subject: [PATCH] (bugfix) Validate Django-managed SSH private keys Validate uploaded SSH private keys with ssh-keygen before saving them so invalid, malformed, or unsupported key material is rejected in the control panel instead of failing later during rsync. Auto-populate the public key when it is omitted, add an edit flow for existing SSH credentials, and cover create, update, and invalid-key paths with view tests. --- src/pobsync_backend/forms.py | 49 +++++++++++++ .../pobsync_backend/ssh_credential_form.html | 8 +-- .../pobsync_backend/ssh_credentials.html | 2 +- src/pobsync_backend/tests/test_views.py | 69 +++++++++++++++---- src/pobsync_backend/views.py | 23 +++++++ src/pobsync_server/urls.py | 1 + 6 files changed, 134 insertions(+), 18 deletions(-) diff --git a/src/pobsync_backend/forms.py b/src/pobsync_backend/forms.py index 3328d9b..a812925 100644 --- a/src/pobsync_backend/forms.py +++ b/src/pobsync_backend/forms.py @@ -1,5 +1,10 @@ from __future__ import annotations +import os +import subprocess +from pathlib import Path +from tempfile import TemporaryDirectory + from django import forms from django.conf import settings @@ -157,6 +162,18 @@ class SshCredentialForm(forms.ModelForm): model = SshCredential fields = ("name", "private_key", "public_key", "known_hosts", "notes") + def clean_private_key(self) -> str: + private_key = self.cleaned_data["private_key"].strip() + public_key = validate_ssh_private_key(private_key) + self.derived_public_key = public_key + return f"{private_key}\n" + + def clean(self): + cleaned_data = super().clean() + if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"): + cleaned_data["public_key"] = self.derived_public_key + return cleaned_data + class RetentionApplyForm(forms.Form): kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All"))) @@ -201,3 +218,35 @@ class ScheduleConfigForm(forms.ModelForm): except ValueError as exc: raise forms.ValidationError(str(exc)) from exc return cron_expr + + +def validate_ssh_private_key(private_key: str) -> str: + with TemporaryDirectory() as tmp: + key_path = Path(tmp) / "identity" + key_path.write_text(f"{private_key}\n", encoding="utf-8") + os.chmod(key_path, 0o600) + try: + result = subprocess.run( + ["ssh-keygen", "-y", "-f", str(key_path)], + check=False, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=5, + ) + except FileNotFoundError as exc: + raise forms.ValidationError("ssh-keygen is not available in this container.") from exc + except subprocess.TimeoutExpired as exc: + raise forms.ValidationError("Could not validate SSH private key before timeout.") from exc + + if result.returncode != 0: + message = result.stderr.strip() or "OpenSSH could not read this private key." + if "passphrase" in message.lower(): + message = "Encrypted SSH private keys are not supported for unattended backups." + raise forms.ValidationError(f"Invalid SSH private key: {message}") + + public_key = result.stdout.strip() + if not public_key: + raise forms.ValidationError("Invalid SSH private key: no public key could be derived.") + return public_key diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html index b98ccdb..0c61024 100644 --- a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html +++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html @@ -1,16 +1,16 @@ {% extends "pobsync_backend/base.html" %} -{% block title %}New SSH Key | pobsync{% endblock %} +{% block title %}{% if credential %}SSH Key | {{ credential.name }}{% else %}New SSH Key{% endif %} | pobsync{% endblock %} {% block content %} -

New SSH Key

+

{% if credential %}SSH Key: {{ credential.name }}{% else %}New SSH Key{% endif %}

Back to SSH keys
-

Create SSH Credential

+

{% if credential %}Edit SSH Credential{% else %}Create SSH Credential{% endif %}

{% csrf_token %} {{ form.non_field_errors }} @@ -25,7 +25,7 @@ {% endfor %}
- +
diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html index 9925ae8..ff4c1a3 100644 --- a/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html +++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html @@ -25,7 +25,7 @@ {% for credential in credentials %} - {{ credential.name }} + {{ credential.name }} {{ credential.public_key|yesno:"yes,no" }} {{ credential.known_hosts|yesno:"yes,no" }} {{ credential.hosts.count }} diff --git a/src/pobsync_backend/tests/test_views.py b/src/pobsync_backend/tests/test_views.py index d31807a..d0efdf0 100644 --- a/src/pobsync_backend/tests/test_views.py +++ b/src/pobsync_backend/tests/test_views.py @@ -3,6 +3,7 @@ from __future__ import annotations from datetime import datetime, timezone from pathlib import Path from tempfile import TemporaryDirectory +from unittest.mock import patch from django.contrib.auth import get_user_model from django.test import TestCase, override_settings @@ -85,24 +86,66 @@ class ViewTests(TestCase): def test_ssh_credentials_view_creates_key(self) -> None: self.client.force_login(self.staff_user) - response = self.client.post( - reverse("create_ssh_credential"), - { - "name": "backup-key", - "private_key": "PRIVATE KEY", - "public_key": "PUBLIC KEY", - "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST", - "notes": "production backup key", - }, - follow=True, - ) + with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"): + response = self.client.post( + reverse("create_ssh_credential"), + { + "name": "backup-key", + "private_key": "PRIVATE KEY", + "public_key": "", + "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST", + "notes": "production backup key", + }, + follow=True, + ) self.assertRedirects(response, reverse("ssh_credentials")) self.assertContains(response, "SSH credential saved for backup-key.") self.assertContains(response, "backup-key") credential = SshCredential.objects.get(name="backup-key") - self.assertEqual(credential.private_key, "PRIVATE KEY") - self.assertEqual(credential.public_key, "PUBLIC KEY") + self.assertEqual(credential.private_key, "PRIVATE KEY\n") + self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY") + + def test_ssh_credentials_view_rejects_invalid_key(self) -> None: + self.client.force_login(self.staff_user) + + response = self.client.post( + reverse("create_ssh_credential"), + { + "name": "bad-key", + "private_key": "not a private key", + "public_key": "", + "known_hosts": "", + "notes": "", + }, + ) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Invalid SSH private key") + self.assertFalse(SshCredential.objects.exists()) + + def test_ssh_credentials_view_updates_existing_key(self) -> None: + self.client.force_login(self.staff_user) + credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY") + + with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="UPDATED PUBLIC KEY"): + response = self.client.post( + reverse("edit_ssh_credential", args=[credential.id]), + { + "name": "backup-key", + "private_key": "UPDATED KEY", + "public_key": "", + "known_hosts": "", + "notes": "rotated", + }, + follow=True, + ) + + self.assertRedirects(response, reverse("ssh_credentials")) + credential.refresh_from_db() + self.assertEqual(credential.private_key, "UPDATED KEY\n") + self.assertEqual(credential.public_key, "UPDATED PUBLIC KEY") + self.assertEqual(credential.notes, "rotated") def test_global_config_form_creates_default_config(self) -> None: self.client.force_login(self.staff_user) diff --git a/src/pobsync_backend/views.py b/src/pobsync_backend/views.py index 0628544..4305268 100644 --- a/src/pobsync_backend/views.py +++ b/src/pobsync_backend/views.py @@ -82,6 +82,29 @@ def create_ssh_credential(request): "pobsync_backend/ssh_credential_form.html", { "form": form, + "credential": None, + }, + ) + + +@staff_member_required +def edit_ssh_credential(request, credential_id: int): + credential = get_object_or_404(SshCredential, id=credential_id) + if request.method == "POST": + form = SshCredentialForm(request.POST, instance=credential) + if form.is_valid(): + saved_credential = form.save() + messages.success(request, f"SSH credential saved for {saved_credential.name}.") + return redirect("ssh_credentials") + else: + form = SshCredentialForm(instance=credential) + + return render( + request, + "pobsync_backend/ssh_credential_form.html", + { + "form": form, + "credential": credential, }, ) diff --git a/src/pobsync_server/urls.py b/src/pobsync_server/urls.py index 49dcd9c..5b17567 100644 --- a/src/pobsync_server/urls.py +++ b/src/pobsync_server/urls.py @@ -11,6 +11,7 @@ urlpatterns = [ path("config/global/", views.edit_global_config, name="edit_global_config"), path("ssh-credentials/", views.ssh_credentials, name="ssh_credentials"), path("ssh-credentials/new/", views.create_ssh_credential, name="create_ssh_credential"), + path("ssh-credentials//", views.edit_ssh_credential, name="edit_ssh_credential"), path("hosts/new/", views.create_host_config, name="create_host_config"), path("hosts//", views.host_detail, name="host_detail"), path("hosts//config/", views.edit_host_config, name="edit_host_config"),