From 97797c574d33f3749678171c5758b567a63331e7 Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Tue, 19 May 2026 18:42:02 +0200 Subject: [PATCH] (bugfix) Normalize pasted OpenSSH private keys Canonicalize uploaded OpenSSH private keys before validation by normalizing line endings, removing whitespace from the base64 body, and re-wrapping it between the BEGIN and END markers. Add SSH credential tests that generate a real ed25519 key, damage its wrapping, and verify that validation succeeds after normalization. Return a clearer validation error for PEM private keys, which are not supported by the current credential flow. --- src/pobsync_backend/forms.py | 23 ++++++++++- .../tests/test_ssh_credentials.py | 40 +++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 src/pobsync_backend/tests/test_ssh_credentials.py diff --git a/src/pobsync_backend/forms.py b/src/pobsync_backend/forms.py index e5f11bb..7802dda 100644 --- a/src/pobsync_backend/forms.py +++ b/src/pobsync_backend/forms.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import textwrap import subprocess from pathlib import Path from tempfile import TemporaryDirectory @@ -238,7 +239,20 @@ class ScheduleConfigForm(forms.ModelForm): def normalize_private_key(private_key: str) -> str: - return private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff") + normalized = private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff") + + begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----" + end_marker = "-----END OPENSSH PRIVATE KEY-----" + if begin_marker in normalized and end_marker in normalized: + before_body, after_begin = normalized.split(begin_marker, 1) + body, after_end = after_begin.split(end_marker, 1) + if before_body.strip() or after_end.strip(): + return normalized + compact_body = "".join(body.split()) + wrapped_body = "\n".join(textwrap.wrap(compact_body, width=70)) + return f"{begin_marker}\n{wrapped_body}\n{end_marker}" + + return normalized def normalize_public_key(public_key: str) -> str: @@ -254,8 +268,13 @@ def public_key_identity(public_key: str) -> str: def validate_ssh_private_key(private_key: str) -> str: if "BEGIN OPENSSH PRIVATE KEY" not in private_key: - if private_key.strip().startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")): + stripped = private_key.strip() + if stripped.startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")): raise forms.ValidationError("This looks like a public key. Paste the private key in this field.") + if "BEGIN RSA PRIVATE KEY" in stripped or "BEGIN EC PRIVATE KEY" in stripped: + raise forms.ValidationError( + "PEM private keys are not supported here yet. Convert it to an unencrypted OpenSSH key first." + ) raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.") with TemporaryDirectory() as tmp: diff --git a/src/pobsync_backend/tests/test_ssh_credentials.py b/src/pobsync_backend/tests/test_ssh_credentials.py new file mode 100644 index 0000000..760a4c3 --- /dev/null +++ b/src/pobsync_backend/tests/test_ssh_credentials.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path +from tempfile import TemporaryDirectory + +from django import forms +from django.test import SimpleTestCase + +from pobsync_backend.forms import normalize_private_key, validate_ssh_private_key + + +class SshCredentialValidationTests(SimpleTestCase): + def test_normalize_private_key_repairs_wrapped_openssh_body(self) -> None: + with TemporaryDirectory() as tmp: + key_path = Path(tmp) / "identity" + subprocess.run( + ["ssh-keygen", "-t", "ed25519", "-N", "", "-C", "test", "-f", str(key_path)], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + private_key = key_path.read_text(encoding="utf-8") + + begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----" + end_marker = "-----END OPENSSH PRIVATE KEY-----" + body = private_key.split(begin_marker, 1)[1].split(end_marker, 1)[0] + damaged_body = " \n ".join(body.split()) + damaged_key = f"{begin_marker}\n{damaged_body}\n{end_marker}" + + normalized_key = normalize_private_key(damaged_key) + + self.assertEqual(validate_ssh_private_key(normalized_key), validate_ssh_private_key(private_key)) + + def test_validate_private_key_rejects_pem_key_with_actionable_message(self) -> None: + with self.assertRaises(forms.ValidationError) as exc: + validate_ssh_private_key("-----BEGIN RSA PRIVATE KEY-----\nabc\n-----END RSA PRIVATE KEY-----") + + self.assertIn("PEM private keys are not supported", str(exc.exception))