(bugfix) Add SSH private key file upload
Allow SSH credentials to be created from an uploaded private key file as an alternative to pasting the key into a textarea. Use multipart form handling in the credential views so server-side keys can be imported without copy/paste wrapping or formatting damage. Cover the upload path with a view test while keeping existing pasted key validation behavior intact.
This commit is contained in:
@@ -147,11 +147,16 @@ class ManualBackupForm(forms.Form):
|
|||||||
|
|
||||||
|
|
||||||
class SshCredentialForm(forms.ModelForm):
|
class SshCredentialForm(forms.ModelForm):
|
||||||
|
private_key_file = forms.FileField(
|
||||||
|
required=False,
|
||||||
|
help_text="Optional. Upload the private key file directly to avoid copy/paste formatting problems.",
|
||||||
|
)
|
||||||
private_key = forms.CharField(
|
private_key = forms.CharField(
|
||||||
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
|
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
|
||||||
|
required=False,
|
||||||
help_text=(
|
help_text=(
|
||||||
"Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. "
|
"Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. "
|
||||||
"Use the matching public key in the field below only as a cross-check."
|
"Leave empty when uploading a private key file."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
public_key = forms.CharField(
|
public_key = forms.CharField(
|
||||||
@@ -171,7 +176,19 @@ class SshCredentialForm(forms.ModelForm):
|
|||||||
fields = ("name", "private_key", "public_key", "known_hosts", "notes")
|
fields = ("name", "private_key", "public_key", "known_hosts", "notes")
|
||||||
|
|
||||||
def clean_private_key(self) -> str:
|
def clean_private_key(self) -> str:
|
||||||
private_key = normalize_private_key(self.cleaned_data["private_key"])
|
uploaded_file = self.files.get("private_key_file")
|
||||||
|
if uploaded_file:
|
||||||
|
try:
|
||||||
|
raw_private_key = uploaded_file.read().decode("utf-8")
|
||||||
|
except UnicodeDecodeError as exc:
|
||||||
|
raise forms.ValidationError("SSH private key files must be UTF-8 text files.") from exc
|
||||||
|
else:
|
||||||
|
raw_private_key = self.cleaned_data.get("private_key", "")
|
||||||
|
|
||||||
|
if not raw_private_key.strip():
|
||||||
|
raise forms.ValidationError("Paste a private key or upload a private key file.")
|
||||||
|
|
||||||
|
private_key = normalize_private_key(raw_private_key)
|
||||||
public_key = validate_ssh_private_key(private_key)
|
public_key = validate_ssh_private_key(private_key)
|
||||||
self.derived_public_key = public_key
|
self.derived_public_key = public_key
|
||||||
return f"{private_key}\n"
|
return f"{private_key}\n"
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
|
|
||||||
<section class="panel">
|
<section class="panel">
|
||||||
<h2>{% if credential %}Edit SSH Credential{% else %}Create SSH Credential{% endif %}</h2>
|
<h2>{% if credential %}Edit SSH Credential{% else %}Create SSH Credential{% endif %}</h2>
|
||||||
<form method="post" class="form-grid">
|
<form method="post" enctype="multipart/form-data" class="form-grid">
|
||||||
{% csrf_token %}
|
{% csrf_token %}
|
||||||
{{ form.non_field_errors }}
|
{{ form.non_field_errors }}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from tempfile import TemporaryDirectory
|
|||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from django.contrib.auth import get_user_model
|
from django.contrib.auth import get_user_model
|
||||||
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||||
from django.test import TestCase, override_settings
|
from django.test import TestCase, override_settings
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
|
|
||||||
@@ -117,6 +118,29 @@ class ViewTests(TestCase):
|
|||||||
self.assertEqual(credential.private_key, "PRIVATE KEY\n")
|
self.assertEqual(credential.private_key, "PRIVATE KEY\n")
|
||||||
self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY")
|
self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY")
|
||||||
|
|
||||||
|
def test_ssh_credentials_view_creates_key_from_uploaded_file(self) -> None:
|
||||||
|
self.client.force_login(self.staff_user)
|
||||||
|
uploaded_key = SimpleUploadedFile("id_ed25519", b"UPLOADED PRIVATE KEY\n", content_type="text/plain")
|
||||||
|
|
||||||
|
with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="DERIVED PUBLIC KEY"):
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("create_ssh_credential"),
|
||||||
|
{
|
||||||
|
"name": "backup-key",
|
||||||
|
"private_key_file": uploaded_key,
|
||||||
|
"private_key": "",
|
||||||
|
"public_key": "",
|
||||||
|
"known_hosts": "",
|
||||||
|
"notes": "uploaded",
|
||||||
|
},
|
||||||
|
follow=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertRedirects(response, reverse("ssh_credentials"))
|
||||||
|
credential = SshCredential.objects.get(name="backup-key")
|
||||||
|
self.assertEqual(credential.private_key, "UPLOADED PRIVATE KEY\n")
|
||||||
|
self.assertEqual(credential.public_key, "DERIVED PUBLIC KEY")
|
||||||
|
|
||||||
def test_ssh_credentials_view_rejects_invalid_key(self) -> None:
|
def test_ssh_credentials_view_rejects_invalid_key(self) -> None:
|
||||||
self.client.force_login(self.staff_user)
|
self.client.force_login(self.staff_user)
|
||||||
|
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ def ssh_credentials(request):
|
|||||||
@staff_member_required
|
@staff_member_required
|
||||||
def create_ssh_credential(request):
|
def create_ssh_credential(request):
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
form = SshCredentialForm(request.POST)
|
form = SshCredentialForm(request.POST, request.FILES)
|
||||||
if form.is_valid():
|
if form.is_valid():
|
||||||
credential = form.save()
|
credential = form.save()
|
||||||
messages.success(request, f"SSH credential saved for {credential.name}.")
|
messages.success(request, f"SSH credential saved for {credential.name}.")
|
||||||
@@ -105,7 +105,7 @@ def create_ssh_credential(request):
|
|||||||
def edit_ssh_credential(request, credential_id: int):
|
def edit_ssh_credential(request, credential_id: int):
|
||||||
credential = get_object_or_404(SshCredential, id=credential_id)
|
credential = get_object_or_404(SshCredential, id=credential_id)
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
form = SshCredentialForm(request.POST, instance=credential)
|
form = SshCredentialForm(request.POST, request.FILES, instance=credential)
|
||||||
if form.is_valid():
|
if form.is_valid():
|
||||||
saved_credential = form.save()
|
saved_credential = form.save()
|
||||||
messages.success(request, f"SSH credential saved for {saved_credential.name}.")
|
messages.success(request, f"SSH credential saved for {saved_credential.name}.")
|
||||||
|
|||||||
Reference in New Issue
Block a user