diff --git a/src/pobsync_backend/forms.py b/src/pobsync_backend/forms.py
index 3328d9b..a812925 100644
--- a/src/pobsync_backend/forms.py
+++ b/src/pobsync_backend/forms.py
@@ -1,5 +1,10 @@
from __future__ import annotations
+import os
+import subprocess
+from pathlib import Path
+from tempfile import TemporaryDirectory
+
from django import forms
from django.conf import settings
@@ -157,6 +162,18 @@ class SshCredentialForm(forms.ModelForm):
model = SshCredential
fields = ("name", "private_key", "public_key", "known_hosts", "notes")
+ def clean_private_key(self) -> str:
+ private_key = self.cleaned_data["private_key"].strip()
+ public_key = validate_ssh_private_key(private_key)
+ self.derived_public_key = public_key
+ return f"{private_key}\n"
+
+ def clean(self):
+ cleaned_data = super().clean()
+ if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"):
+ cleaned_data["public_key"] = self.derived_public_key
+ return cleaned_data
+
class RetentionApplyForm(forms.Form):
kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All")))
@@ -201,3 +218,35 @@ class ScheduleConfigForm(forms.ModelForm):
except ValueError as exc:
raise forms.ValidationError(str(exc)) from exc
return cron_expr
+
+
+def validate_ssh_private_key(private_key: str) -> str:
+ with TemporaryDirectory() as tmp:
+ key_path = Path(tmp) / "identity"
+ key_path.write_text(f"{private_key}\n", encoding="utf-8")
+ os.chmod(key_path, 0o600)
+ try:
+ result = subprocess.run(
+ ["ssh-keygen", "-y", "-f", str(key_path)],
+ check=False,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ timeout=5,
+ )
+ except FileNotFoundError as exc:
+ raise forms.ValidationError("ssh-keygen is not available in this container.") from exc
+ except subprocess.TimeoutExpired as exc:
+ raise forms.ValidationError("Could not validate SSH private key before timeout.") from exc
+
+ if result.returncode != 0:
+ message = result.stderr.strip() or "OpenSSH could not read this private key."
+ if "passphrase" in message.lower():
+ message = "Encrypted SSH private keys are not supported for unattended backups."
+ raise forms.ValidationError(f"Invalid SSH private key: {message}")
+
+ public_key = result.stdout.strip()
+ if not public_key:
+ raise forms.ValidationError("Invalid SSH private key: no public key could be derived.")
+ return public_key
diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html
index b98ccdb..0c61024 100644
--- a/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html
+++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credential_form.html
@@ -1,16 +1,16 @@
{% extends "pobsync_backend/base.html" %}
-{% block title %}New SSH Key | pobsync{% endblock %}
+{% block title %}{% if credential %}SSH Key | {{ credential.name }}{% else %}New SSH Key{% endif %} | pobsync{% endblock %}
{% block content %}
-
New SSH Key
+ {% if credential %}SSH Key: {{ credential.name }}{% else %}New SSH Key{% endif %}
- Create SSH Credential
+ {% if credential %}Edit SSH Credential{% else %}Create SSH Credential{% endif %}
diff --git a/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html b/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html
index 9925ae8..ff4c1a3 100644
--- a/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html
+++ b/src/pobsync_backend/templates/pobsync_backend/ssh_credentials.html
@@ -25,7 +25,7 @@
{% for credential in credentials %}
- | {{ credential.name }} |
+ {{ credential.name }} |
{{ credential.public_key|yesno:"yes,no" }} |
{{ credential.known_hosts|yesno:"yes,no" }} |
{{ credential.hosts.count }} |
diff --git a/src/pobsync_backend/tests/test_views.py b/src/pobsync_backend/tests/test_views.py
index d31807a..d0efdf0 100644
--- a/src/pobsync_backend/tests/test_views.py
+++ b/src/pobsync_backend/tests/test_views.py
@@ -3,6 +3,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from tempfile import TemporaryDirectory
+from unittest.mock import patch
from django.contrib.auth import get_user_model
from django.test import TestCase, override_settings
@@ -85,24 +86,66 @@ class ViewTests(TestCase):
def test_ssh_credentials_view_creates_key(self) -> None:
self.client.force_login(self.staff_user)
- response = self.client.post(
- reverse("create_ssh_credential"),
- {
- "name": "backup-key",
- "private_key": "PRIVATE KEY",
- "public_key": "PUBLIC KEY",
- "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST",
- "notes": "production backup key",
- },
- follow=True,
- )
+ with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"):
+ response = self.client.post(
+ reverse("create_ssh_credential"),
+ {
+ "name": "backup-key",
+ "private_key": "PRIVATE KEY",
+ "public_key": "",
+ "known_hosts": "web-01.example.test ssh-ed25519 AAAATEST",
+ "notes": "production backup key",
+ },
+ follow=True,
+ )
self.assertRedirects(response, reverse("ssh_credentials"))
self.assertContains(response, "SSH credential saved for backup-key.")
self.assertContains(response, "backup-key")
credential = SshCredential.objects.get(name="backup-key")
- self.assertEqual(credential.private_key, "PRIVATE KEY")
- self.assertEqual(credential.public_key, "PUBLIC KEY")
+ self.assertEqual(credential.private_key, "PRIVATE KEY\n")
+ self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY")
+
+ def test_ssh_credentials_view_rejects_invalid_key(self) -> None:
+ self.client.force_login(self.staff_user)
+
+ response = self.client.post(
+ reverse("create_ssh_credential"),
+ {
+ "name": "bad-key",
+ "private_key": "not a private key",
+ "public_key": "",
+ "known_hosts": "",
+ "notes": "",
+ },
+ )
+
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, "Invalid SSH private key")
+ self.assertFalse(SshCredential.objects.exists())
+
+ def test_ssh_credentials_view_updates_existing_key(self) -> None:
+ self.client.force_login(self.staff_user)
+ credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY")
+
+ with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="UPDATED PUBLIC KEY"):
+ response = self.client.post(
+ reverse("edit_ssh_credential", args=[credential.id]),
+ {
+ "name": "backup-key",
+ "private_key": "UPDATED KEY",
+ "public_key": "",
+ "known_hosts": "",
+ "notes": "rotated",
+ },
+ follow=True,
+ )
+
+ self.assertRedirects(response, reverse("ssh_credentials"))
+ credential.refresh_from_db()
+ self.assertEqual(credential.private_key, "UPDATED KEY\n")
+ self.assertEqual(credential.public_key, "UPDATED PUBLIC KEY")
+ self.assertEqual(credential.notes, "rotated")
def test_global_config_form_creates_default_config(self) -> None:
self.client.force_login(self.staff_user)
diff --git a/src/pobsync_backend/views.py b/src/pobsync_backend/views.py
index 0628544..4305268 100644
--- a/src/pobsync_backend/views.py
+++ b/src/pobsync_backend/views.py
@@ -82,6 +82,29 @@ def create_ssh_credential(request):
"pobsync_backend/ssh_credential_form.html",
{
"form": form,
+ "credential": None,
+ },
+ )
+
+
+@staff_member_required
+def edit_ssh_credential(request, credential_id: int):
+ credential = get_object_or_404(SshCredential, id=credential_id)
+ if request.method == "POST":
+ form = SshCredentialForm(request.POST, instance=credential)
+ if form.is_valid():
+ saved_credential = form.save()
+ messages.success(request, f"SSH credential saved for {saved_credential.name}.")
+ return redirect("ssh_credentials")
+ else:
+ form = SshCredentialForm(instance=credential)
+
+ return render(
+ request,
+ "pobsync_backend/ssh_credential_form.html",
+ {
+ "form": form,
+ "credential": credential,
},
)
diff --git a/src/pobsync_server/urls.py b/src/pobsync_server/urls.py
index 49dcd9c..5b17567 100644
--- a/src/pobsync_server/urls.py
+++ b/src/pobsync_server/urls.py
@@ -11,6 +11,7 @@ urlpatterns = [
path("config/global/", views.edit_global_config, name="edit_global_config"),
path("ssh-credentials/", views.ssh_credentials, name="ssh_credentials"),
path("ssh-credentials/new/", views.create_ssh_credential, name="create_ssh_credential"),
+ path("ssh-credentials//", views.edit_ssh_credential, name="edit_ssh_credential"),
path("hosts/new/", views.create_host_config, name="create_host_config"),
path("hosts//", views.host_detail, name="host_detail"),
path("hosts//config/", views.edit_host_config, name="edit_host_config"),