2 Commits

Author SHA1 Message Date
97797c574d (bugfix) Normalize pasted OpenSSH private keys
Canonicalize uploaded OpenSSH private keys before validation by
normalizing line endings, removing whitespace from the base64 body,
and re-wrapping it between the BEGIN and END markers.

Add SSH credential tests that generate a real ed25519 key, damage its
wrapping, and verify that validation succeeds after normalization.

Return a clearer validation error for PEM private keys, which are not
supported by the current credential flow.
2026-05-19 18:42:02 +02:00
c7cfb603b0 (bugfix) Improve SSH credential validation feedback
Normalize pasted private keys before validation and detect common SSH
credential mistakes, including public keys pasted into the private key
field and public keys that do not match the supplied private key.

Translate OpenSSH libcrypto parse failures into a clearer user-facing
message and disable browser spellcheck/autocomplete on SSH key fields.

Document the native update flow as git pull followed by the
non-interactive installer so deployments refresh cleanly.
2026-05-19 18:35:39 +02:00
4 changed files with 150 additions and 8 deletions

View File

@@ -154,9 +154,12 @@ From a fresh checkout or the existing app directory:
```
git pull
sudo scripts/install-systemd --app-dir /opt/pobsync/app
sudo scripts/install-systemd --non-interactive
```
The installer preserves an existing `/etc/pobsync/pobsync.env` unless you pass `--force-env`. It refreshes the installed
app, Python dependencies, migrations, static files, and systemd services.
Then check:
```

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import os
import textwrap
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -147,12 +148,19 @@ class ManualBackupForm(forms.Form):
class SshCredentialForm(forms.ModelForm):
private_key = forms.CharField(
widget=forms.Textarea,
help_text="Private key used by the worker container for SSH backups.",
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
help_text=(
"Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. "
"Use the matching public key in the field below only as a cross-check."
),
)
public_key = forms.CharField(
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
required=False,
help_text="Optional. If set, pobsync verifies it matches the private key.",
)
public_key = forms.CharField(widget=forms.Textarea, required=False)
known_hosts = forms.CharField(
widget=forms.Textarea,
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
required=False,
help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.",
)
@@ -163,14 +171,24 @@ class SshCredentialForm(forms.ModelForm):
fields = ("name", "private_key", "public_key", "known_hosts", "notes")
def clean_private_key(self) -> str:
private_key = self.cleaned_data["private_key"].strip()
private_key = normalize_private_key(self.cleaned_data["private_key"])
public_key = validate_ssh_private_key(private_key)
self.derived_public_key = public_key
return f"{private_key}\n"
def clean(self):
cleaned_data = super().clean()
if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"):
provided_public_key = normalize_public_key(cleaned_data.get("public_key", ""))
if provided_public_key:
cleaned_data["public_key"] = provided_public_key
if cleaned_data.get("private_key") and provided_public_key and hasattr(self, "derived_public_key"):
if public_key_identity(provided_public_key) != public_key_identity(self.derived_public_key):
self.add_error(
"public_key",
forms.ValidationError("Public key does not match the supplied private key."),
)
elif cleaned_data.get("private_key") and not provided_public_key and hasattr(self, "derived_public_key"):
cleaned_data["public_key"] = self.derived_public_key
return cleaned_data
@@ -220,7 +238,45 @@ class ScheduleConfigForm(forms.ModelForm):
return cron_expr
def normalize_private_key(private_key: str) -> str:
normalized = private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff")
begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----"
end_marker = "-----END OPENSSH PRIVATE KEY-----"
if begin_marker in normalized and end_marker in normalized:
before_body, after_begin = normalized.split(begin_marker, 1)
body, after_end = after_begin.split(end_marker, 1)
if before_body.strip() or after_end.strip():
return normalized
compact_body = "".join(body.split())
wrapped_body = "\n".join(textwrap.wrap(compact_body, width=70))
return f"{begin_marker}\n{wrapped_body}\n{end_marker}"
return normalized
def normalize_public_key(public_key: str) -> str:
return " ".join(public_key.strip().split())
def public_key_identity(public_key: str) -> str:
parts = normalize_public_key(public_key).split()
if len(parts) >= 2:
return " ".join(parts[:2])
return normalize_public_key(public_key)
def validate_ssh_private_key(private_key: str) -> str:
if "BEGIN OPENSSH PRIVATE KEY" not in private_key:
stripped = private_key.strip()
if stripped.startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")):
raise forms.ValidationError("This looks like a public key. Paste the private key in this field.")
if "BEGIN RSA PRIVATE KEY" in stripped or "BEGIN EC PRIVATE KEY" in stripped:
raise forms.ValidationError(
"PEM private keys are not supported here yet. Convert it to an unencrypted OpenSSH key first."
)
raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.")
with TemporaryDirectory() as tmp:
key_path = Path(tmp) / "identity"
key_path.write_text(f"{private_key}\n", encoding="utf-8")
@@ -242,8 +298,14 @@ def validate_ssh_private_key(private_key: str) -> str:
if result.returncode != 0:
message = result.stderr.strip() or "OpenSSH could not read this private key."
if "passphrase" in message.lower():
lower_message = message.lower()
if "passphrase" in lower_message:
message = "Encrypted SSH private keys are not supported for unattended backups."
elif "libcrypto" in lower_message:
message = (
"OpenSSH could not parse this key. It is usually incomplete, corrupted while copying, "
"or not an unencrypted OpenSSH private key."
)
raise forms.ValidationError(f"Invalid SSH private key: {message}")
public_key = result.stdout.strip()

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from django import forms
from django.test import SimpleTestCase
from pobsync_backend.forms import normalize_private_key, validate_ssh_private_key
class SshCredentialValidationTests(SimpleTestCase):
def test_normalize_private_key_repairs_wrapped_openssh_body(self) -> None:
with TemporaryDirectory() as tmp:
key_path = Path(tmp) / "identity"
subprocess.run(
["ssh-keygen", "-t", "ed25519", "-N", "", "-C", "test", "-f", str(key_path)],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
private_key = key_path.read_text(encoding="utf-8")
begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----"
end_marker = "-----END OPENSSH PRIVATE KEY-----"
body = private_key.split(begin_marker, 1)[1].split(end_marker, 1)[0]
damaged_body = " \n ".join(body.split())
damaged_key = f"{begin_marker}\n{damaged_body}\n{end_marker}"
normalized_key = normalize_private_key(damaged_key)
self.assertEqual(validate_ssh_private_key(normalized_key), validate_ssh_private_key(private_key))
def test_validate_private_key_rejects_pem_key_with_actionable_message(self) -> None:
with self.assertRaises(forms.ValidationError) as exc:
validate_ssh_private_key("-----BEGIN RSA PRIVATE KEY-----\nabc\n-----END RSA PRIVATE KEY-----")
self.assertIn("PEM private keys are not supported", str(exc.exception))

View File

@@ -135,6 +135,43 @@ class ViewTests(TestCase):
self.assertContains(response, "Invalid SSH private key")
self.assertFalse(SshCredential.objects.exists())
def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None:
self.client.force_login(self.staff_user)
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "bad-key",
"private_key": "ssh-ed25519 AAAATEST root@backup",
"public_key": "",
"known_hosts": "",
"notes": "",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "This looks like a public key")
self.assertFalse(SshCredential.objects.exists())
def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None:
self.client.force_login(self.staff_user)
with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"):
response = self.client.post(
reverse("create_ssh_credential"),
{
"name": "bad-key",
"private_key": "PRIVATE KEY",
"public_key": "ssh-ed25519 AAAAOTHER root@backup",
"known_hosts": "",
"notes": "",
},
)
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Public key does not match")
self.assertFalse(SshCredential.objects.exists())
def test_ssh_credentials_view_updates_existing_key(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY")