From c7cfb603b0b1d7e03d8646290cb631b14e6fb2cd Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Tue, 19 May 2026 18:35:39 +0200 Subject: [PATCH] (bugfix) Improve SSH credential validation feedback Normalize pasted private keys before validation and detect common SSH credential mistakes, including public keys pasted into the private key field and public keys that do not match the supplied private key. Translate OpenSSH libcrypto parse failures into a clearer user-facing message and disable browser spellcheck/autocomplete on SSH key fields. Document the native update flow as git pull followed by the non-interactive installer so deployments refresh cleanly. --- README.md | 5 ++- src/pobsync_backend/forms.py | 57 ++++++++++++++++++++++--- src/pobsync_backend/tests/test_views.py | 37 ++++++++++++++++ 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 2409d94..3c5bb58 100644 --- a/README.md +++ b/README.md @@ -154,9 +154,12 @@ From a fresh checkout or the existing app directory: ``` git pull -sudo scripts/install-systemd --app-dir /opt/pobsync/app +sudo scripts/install-systemd --non-interactive ``` +The installer preserves an existing `/etc/pobsync/pobsync.env` unless you pass `--force-env`. It refreshes the installed +app, Python dependencies, migrations, static files, and systemd services. + Then check: ``` diff --git a/src/pobsync_backend/forms.py b/src/pobsync_backend/forms.py index a812925..e5f11bb 100644 --- a/src/pobsync_backend/forms.py +++ b/src/pobsync_backend/forms.py @@ -147,12 +147,19 @@ class ManualBackupForm(forms.Form): class SshCredentialForm(forms.ModelForm): private_key = forms.CharField( - widget=forms.Textarea, - help_text="Private key used by the worker container for SSH backups.", + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), + help_text=( + "Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. " + "Use the matching public key in the field below only as a cross-check." + ), + ) + public_key = forms.CharField( + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), + required=False, + help_text="Optional. If set, pobsync verifies it matches the private key.", ) - public_key = forms.CharField(widget=forms.Textarea, required=False) known_hosts = forms.CharField( - widget=forms.Textarea, + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), required=False, help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.", ) @@ -163,14 +170,24 @@ class SshCredentialForm(forms.ModelForm): fields = ("name", "private_key", "public_key", "known_hosts", "notes") def clean_private_key(self) -> str: - private_key = self.cleaned_data["private_key"].strip() + private_key = normalize_private_key(self.cleaned_data["private_key"]) public_key = validate_ssh_private_key(private_key) self.derived_public_key = public_key return f"{private_key}\n" def clean(self): cleaned_data = super().clean() - if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"): + provided_public_key = normalize_public_key(cleaned_data.get("public_key", "")) + if provided_public_key: + cleaned_data["public_key"] = provided_public_key + + if cleaned_data.get("private_key") and provided_public_key and hasattr(self, "derived_public_key"): + if public_key_identity(provided_public_key) != public_key_identity(self.derived_public_key): + self.add_error( + "public_key", + forms.ValidationError("Public key does not match the supplied private key."), + ) + elif cleaned_data.get("private_key") and not provided_public_key and hasattr(self, "derived_public_key"): cleaned_data["public_key"] = self.derived_public_key return cleaned_data @@ -220,7 +237,27 @@ class ScheduleConfigForm(forms.ModelForm): return cron_expr +def normalize_private_key(private_key: str) -> str: + return private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff") + + +def normalize_public_key(public_key: str) -> str: + return " ".join(public_key.strip().split()) + + +def public_key_identity(public_key: str) -> str: + parts = normalize_public_key(public_key).split() + if len(parts) >= 2: + return " ".join(parts[:2]) + return normalize_public_key(public_key) + + def validate_ssh_private_key(private_key: str) -> str: + if "BEGIN OPENSSH PRIVATE KEY" not in private_key: + if private_key.strip().startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")): + raise forms.ValidationError("This looks like a public key. Paste the private key in this field.") + raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.") + with TemporaryDirectory() as tmp: key_path = Path(tmp) / "identity" key_path.write_text(f"{private_key}\n", encoding="utf-8") @@ -242,8 +279,14 @@ def validate_ssh_private_key(private_key: str) -> str: if result.returncode != 0: message = result.stderr.strip() or "OpenSSH could not read this private key." - if "passphrase" in message.lower(): + lower_message = message.lower() + if "passphrase" in lower_message: message = "Encrypted SSH private keys are not supported for unattended backups." + elif "libcrypto" in lower_message: + message = ( + "OpenSSH could not parse this key. It is usually incomplete, corrupted while copying, " + "or not an unencrypted OpenSSH private key." + ) raise forms.ValidationError(f"Invalid SSH private key: {message}") public_key = result.stdout.strip() diff --git a/src/pobsync_backend/tests/test_views.py b/src/pobsync_backend/tests/test_views.py index 5aa0223..2ff8d58 100644 --- a/src/pobsync_backend/tests/test_views.py +++ b/src/pobsync_backend/tests/test_views.py @@ -135,6 +135,43 @@ class ViewTests(TestCase): self.assertContains(response, "Invalid SSH private key") self.assertFalse(SshCredential.objects.exists()) + def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None: + self.client.force_login(self.staff_user) + + response = self.client.post( + reverse("create_ssh_credential"), + { + "name": "bad-key", + "private_key": "ssh-ed25519 AAAATEST root@backup", + "public_key": "", + "known_hosts": "", + "notes": "", + }, + ) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "This looks like a public key") + self.assertFalse(SshCredential.objects.exists()) + + def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None: + self.client.force_login(self.staff_user) + + with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"): + response = self.client.post( + reverse("create_ssh_credential"), + { + "name": "bad-key", + "private_key": "PRIVATE KEY", + "public_key": "ssh-ed25519 AAAAOTHER root@backup", + "known_hosts": "", + "notes": "", + }, + ) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Public key does not match") + self.assertFalse(SshCredential.objects.exists()) + def test_ssh_credentials_view_updates_existing_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY")