(bugfix) Normalize pasted OpenSSH private keys
Canonicalize uploaded OpenSSH private keys before validation by normalizing line endings, removing whitespace from the base64 body, and re-wrapping it between the BEGIN and END markers. Add SSH credential tests that generate a real ed25519 key, damage its wrapping, and verify that validation succeeds after normalization. Return a clearer validation error for PEM private keys, which are not supported by the current credential flow.
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import textwrap
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
@@ -238,7 +239,20 @@ class ScheduleConfigForm(forms.ModelForm):
|
|||||||
|
|
||||||
|
|
||||||
def normalize_private_key(private_key: str) -> str:
|
def normalize_private_key(private_key: str) -> str:
|
||||||
return private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff")
|
normalized = private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff")
|
||||||
|
|
||||||
|
begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----"
|
||||||
|
end_marker = "-----END OPENSSH PRIVATE KEY-----"
|
||||||
|
if begin_marker in normalized and end_marker in normalized:
|
||||||
|
before_body, after_begin = normalized.split(begin_marker, 1)
|
||||||
|
body, after_end = after_begin.split(end_marker, 1)
|
||||||
|
if before_body.strip() or after_end.strip():
|
||||||
|
return normalized
|
||||||
|
compact_body = "".join(body.split())
|
||||||
|
wrapped_body = "\n".join(textwrap.wrap(compact_body, width=70))
|
||||||
|
return f"{begin_marker}\n{wrapped_body}\n{end_marker}"
|
||||||
|
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
|
||||||
def normalize_public_key(public_key: str) -> str:
|
def normalize_public_key(public_key: str) -> str:
|
||||||
@@ -254,8 +268,13 @@ def public_key_identity(public_key: str) -> str:
|
|||||||
|
|
||||||
def validate_ssh_private_key(private_key: str) -> str:
|
def validate_ssh_private_key(private_key: str) -> str:
|
||||||
if "BEGIN OPENSSH PRIVATE KEY" not in private_key:
|
if "BEGIN OPENSSH PRIVATE KEY" not in private_key:
|
||||||
if private_key.strip().startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")):
|
stripped = private_key.strip()
|
||||||
|
if stripped.startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")):
|
||||||
raise forms.ValidationError("This looks like a public key. Paste the private key in this field.")
|
raise forms.ValidationError("This looks like a public key. Paste the private key in this field.")
|
||||||
|
if "BEGIN RSA PRIVATE KEY" in stripped or "BEGIN EC PRIVATE KEY" in stripped:
|
||||||
|
raise forms.ValidationError(
|
||||||
|
"PEM private keys are not supported here yet. Convert it to an unencrypted OpenSSH key first."
|
||||||
|
)
|
||||||
raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.")
|
raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.")
|
||||||
|
|
||||||
with TemporaryDirectory() as tmp:
|
with TemporaryDirectory() as tmp:
|
||||||
|
|||||||
40
src/pobsync_backend/tests/test_ssh_credentials.py
Normal file
40
src/pobsync_backend/tests/test_ssh_credentials.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
from django import forms
|
||||||
|
from django.test import SimpleTestCase
|
||||||
|
|
||||||
|
from pobsync_backend.forms import normalize_private_key, validate_ssh_private_key
|
||||||
|
|
||||||
|
|
||||||
|
class SshCredentialValidationTests(SimpleTestCase):
|
||||||
|
def test_normalize_private_key_repairs_wrapped_openssh_body(self) -> None:
|
||||||
|
with TemporaryDirectory() as tmp:
|
||||||
|
key_path = Path(tmp) / "identity"
|
||||||
|
subprocess.run(
|
||||||
|
["ssh-keygen", "-t", "ed25519", "-N", "", "-C", "test", "-f", str(key_path)],
|
||||||
|
check=True,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
private_key = key_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----"
|
||||||
|
end_marker = "-----END OPENSSH PRIVATE KEY-----"
|
||||||
|
body = private_key.split(begin_marker, 1)[1].split(end_marker, 1)[0]
|
||||||
|
damaged_body = " \n ".join(body.split())
|
||||||
|
damaged_key = f"{begin_marker}\n{damaged_body}\n{end_marker}"
|
||||||
|
|
||||||
|
normalized_key = normalize_private_key(damaged_key)
|
||||||
|
|
||||||
|
self.assertEqual(validate_ssh_private_key(normalized_key), validate_ssh_private_key(private_key))
|
||||||
|
|
||||||
|
def test_validate_private_key_rejects_pem_key_with_actionable_message(self) -> None:
|
||||||
|
with self.assertRaises(forms.ValidationError) as exc:
|
||||||
|
validate_ssh_private_key("-----BEGIN RSA PRIVATE KEY-----\nabc\n-----END RSA PRIVATE KEY-----")
|
||||||
|
|
||||||
|
self.assertIn("PEM private keys are not supported", str(exc.exception))
|
||||||
Reference in New Issue
Block a user