diff --git a/README.md b/README.md index 2409d94..3c5bb58 100644 --- a/README.md +++ b/README.md @@ -154,9 +154,12 @@ From a fresh checkout or the existing app directory: ``` git pull -sudo scripts/install-systemd --app-dir /opt/pobsync/app +sudo scripts/install-systemd --non-interactive ``` +The installer preserves an existing `/etc/pobsync/pobsync.env` unless you pass `--force-env`. It refreshes the installed +app, Python dependencies, migrations, static files, and systemd services. + Then check: ``` diff --git a/src/pobsync_backend/forms.py b/src/pobsync_backend/forms.py index a812925..e5f11bb 100644 --- a/src/pobsync_backend/forms.py +++ b/src/pobsync_backend/forms.py @@ -147,12 +147,19 @@ class ManualBackupForm(forms.Form): class SshCredentialForm(forms.ModelForm): private_key = forms.CharField( - widget=forms.Textarea, - help_text="Private key used by the worker container for SSH backups.", + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), + help_text=( + "Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. " + "Use the matching public key in the field below only as a cross-check." + ), + ) + public_key = forms.CharField( + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), + required=False, + help_text="Optional. If set, pobsync verifies it matches the private key.", ) - public_key = forms.CharField(widget=forms.Textarea, required=False) known_hosts = forms.CharField( - widget=forms.Textarea, + widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), required=False, help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.", ) @@ -163,14 +170,24 @@ class SshCredentialForm(forms.ModelForm): fields = ("name", "private_key", "public_key", "known_hosts", "notes") def clean_private_key(self) -> str: - private_key = self.cleaned_data["private_key"].strip() + private_key = normalize_private_key(self.cleaned_data["private_key"]) public_key = validate_ssh_private_key(private_key) self.derived_public_key = public_key return f"{private_key}\n" def clean(self): cleaned_data = super().clean() - if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"): + provided_public_key = normalize_public_key(cleaned_data.get("public_key", "")) + if provided_public_key: + cleaned_data["public_key"] = provided_public_key + + if cleaned_data.get("private_key") and provided_public_key and hasattr(self, "derived_public_key"): + if public_key_identity(provided_public_key) != public_key_identity(self.derived_public_key): + self.add_error( + "public_key", + forms.ValidationError("Public key does not match the supplied private key."), + ) + elif cleaned_data.get("private_key") and not provided_public_key and hasattr(self, "derived_public_key"): cleaned_data["public_key"] = self.derived_public_key return cleaned_data @@ -220,7 +237,27 @@ class ScheduleConfigForm(forms.ModelForm): return cron_expr +def normalize_private_key(private_key: str) -> str: + return private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff") + + +def normalize_public_key(public_key: str) -> str: + return " ".join(public_key.strip().split()) + + +def public_key_identity(public_key: str) -> str: + parts = normalize_public_key(public_key).split() + if len(parts) >= 2: + return " ".join(parts[:2]) + return normalize_public_key(public_key) + + def validate_ssh_private_key(private_key: str) -> str: + if "BEGIN OPENSSH PRIVATE KEY" not in private_key: + if private_key.strip().startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")): + raise forms.ValidationError("This looks like a public key. Paste the private key in this field.") + raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.") + with TemporaryDirectory() as tmp: key_path = Path(tmp) / "identity" key_path.write_text(f"{private_key}\n", encoding="utf-8") @@ -242,8 +279,14 @@ def validate_ssh_private_key(private_key: str) -> str: if result.returncode != 0: message = result.stderr.strip() or "OpenSSH could not read this private key." - if "passphrase" in message.lower(): + lower_message = message.lower() + if "passphrase" in lower_message: message = "Encrypted SSH private keys are not supported for unattended backups." + elif "libcrypto" in lower_message: + message = ( + "OpenSSH could not parse this key. It is usually incomplete, corrupted while copying, " + "or not an unencrypted OpenSSH private key." + ) raise forms.ValidationError(f"Invalid SSH private key: {message}") public_key = result.stdout.strip() diff --git a/src/pobsync_backend/tests/test_views.py b/src/pobsync_backend/tests/test_views.py index 5aa0223..2ff8d58 100644 --- a/src/pobsync_backend/tests/test_views.py +++ b/src/pobsync_backend/tests/test_views.py @@ -135,6 +135,43 @@ class ViewTests(TestCase): self.assertContains(response, "Invalid SSH private key") self.assertFalse(SshCredential.objects.exists()) + def test_ssh_credentials_view_rejects_public_key_in_private_key_field(self) -> None: + self.client.force_login(self.staff_user) + + response = self.client.post( + reverse("create_ssh_credential"), + { + "name": "bad-key", + "private_key": "ssh-ed25519 AAAATEST root@backup", + "public_key": "", + "known_hosts": "", + "notes": "", + }, + ) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "This looks like a public key") + self.assertFalse(SshCredential.objects.exists()) + + def test_ssh_credentials_view_rejects_mismatched_public_key(self) -> None: + self.client.force_login(self.staff_user) + + with patch("pobsync_backend.forms.validate_ssh_private_key", return_value="ssh-ed25519 AAAADERIVED derived"): + response = self.client.post( + reverse("create_ssh_credential"), + { + "name": "bad-key", + "private_key": "PRIVATE KEY", + "public_key": "ssh-ed25519 AAAAOTHER root@backup", + "known_hosts": "", + "notes": "", + }, + ) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Public key does not match") + self.assertFalse(SshCredential.objects.exists()) + def test_ssh_credentials_view_updates_existing_key(self) -> None: self.client.force_login(self.staff_user) credential = SshCredential.objects.create(name="backup-key", private_key="OLD KEY")