Files
pobsync/src/pobsync_backend/forms.py

296 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from django import forms
from django.conf import settings
from .models import GlobalConfig, HostConfig, ScheduleConfig, SshCredential
from .scheduler import parse_cron_expr
class NewlineListField(forms.CharField):
widget = forms.Textarea
def __init__(self, *args, **kwargs) -> None:
kwargs.setdefault("required", False)
super().__init__(*args, **kwargs)
def prepare_value(self, value):
if isinstance(value, list):
return "\n".join(str(item) for item in value)
return value
def to_python(self, value) -> list[str]:
if not value:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
return [line.strip() for line in str(value).splitlines() if line.strip()]
class NullableNewlineListField(NewlineListField):
def to_python(self, value) -> list[str] | None:
parsed = super().to_python(value)
return parsed or None
class HostConfigForm(forms.ModelForm):
includes = NewlineListField(help_text="One include path per line. Leave empty to include defaults.")
excludes_add = NewlineListField(help_text="One additional exclude pattern per line.")
excludes_replace = NullableNewlineListField(
help_text="Optional. When set, replaces global excludes; one pattern per line."
)
rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.")
class Meta:
model = HostConfig
fields = (
"address",
"enabled",
"ssh_credential",
"ssh_user",
"ssh_port",
"source_root",
"includes",
"excludes_add",
"excludes_replace",
"rsync_extra_args",
"retention_daily",
"retention_weekly",
"retention_monthly",
"retention_yearly",
)
help_texts = {
"ssh_credential": "Optional. Overrides the global SSH credential for this host.",
"ssh_user": "Leave empty to use the global SSH user.",
"ssh_port": "Leave empty to use the global SSH port.",
"source_root": "Leave empty to use the global default source root.",
}
class CreateHostConfigForm(HostConfigForm):
class Meta(HostConfigForm.Meta):
fields = ("host", *HostConfigForm.Meta.fields)
help_texts = {
**HostConfigForm.Meta.help_texts,
"host": "Stable internal host name used for backup paths.",
}
class GlobalConfigForm(forms.ModelForm):
ssh_options = NewlineListField(help_text="One SSH option per line.")
rsync_args = NewlineListField(help_text="One default rsync argument per line.")
rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.")
excludes_default = NewlineListField(help_text="One default exclude pattern per line.")
class Meta:
model = GlobalConfig
fields = (
"name",
"default_ssh_credential",
"ssh_user",
"ssh_port",
"ssh_options",
"rsync_binary",
"rsync_args",
"rsync_extra_args",
"rsync_timeout_seconds",
"rsync_bwlimit_kbps",
"default_source_root",
"default_destination_subdir",
"excludes_default",
"retention_daily",
"retention_weekly",
"retention_monthly",
"retention_yearly",
)
help_texts = {
"name": "Usually 'default'. The backup engine currently reads the default config.",
"default_ssh_credential": "Optional. Used by hosts without their own SSH credential.",
"default_source_root": "Used by hosts without a custom source root.",
"default_destination_subdir": "Optional subdirectory below each snapshot.",
}
def save(self, commit: bool = True):
instance = super().save(commit=False)
instance.backup_root = settings.POBSYNC_BACKUP_ROOT
instance.pobsync_home = settings.POBSYNC_HOME
if commit:
instance.save()
self.save_m2m()
return instance
class ManualBackupForm(forms.Form):
dry_run = forms.BooleanField(
label="Dry run",
required=False,
initial=True,
help_text="Queue rsync in dry-run mode without writing a snapshot.",
)
prune = forms.BooleanField(
label="Apply retention after success",
required=False,
help_text="Apply retention after a successful non-dry-run backup.",
)
prune_max_delete = forms.IntegerField(label="Retention max delete", min_value=0, initial=10)
prune_protect_bases = forms.BooleanField(
label="Protect base snapshots",
required=False,
help_text="Keep snapshots that are used as bases by other snapshots.",
)
class SshCredentialForm(forms.ModelForm):
private_key = forms.CharField(
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
help_text=(
"Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. "
"Use the matching public key in the field below only as a cross-check."
),
)
public_key = forms.CharField(
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
required=False,
help_text="Optional. If set, pobsync verifies it matches the private key.",
)
known_hosts = forms.CharField(
widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}),
required=False,
help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.",
)
notes = forms.CharField(widget=forms.Textarea, required=False)
class Meta:
model = SshCredential
fields = ("name", "private_key", "public_key", "known_hosts", "notes")
def clean_private_key(self) -> str:
private_key = normalize_private_key(self.cleaned_data["private_key"])
public_key = validate_ssh_private_key(private_key)
self.derived_public_key = public_key
return f"{private_key}\n"
def clean(self):
cleaned_data = super().clean()
provided_public_key = normalize_public_key(cleaned_data.get("public_key", ""))
if provided_public_key:
cleaned_data["public_key"] = provided_public_key
if cleaned_data.get("private_key") and provided_public_key and hasattr(self, "derived_public_key"):
if public_key_identity(provided_public_key) != public_key_identity(self.derived_public_key):
self.add_error(
"public_key",
forms.ValidationError("Public key does not match the supplied private key."),
)
elif cleaned_data.get("private_key") and not provided_public_key and hasattr(self, "derived_public_key"):
cleaned_data["public_key"] = self.derived_public_key
return cleaned_data
class RetentionApplyForm(forms.Form):
kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All")))
protect_bases = forms.BooleanField(required=False)
max_delete = forms.IntegerField(min_value=0, initial=10)
confirm_host = forms.CharField()
def __init__(self, *args, host_name: str, **kwargs) -> None:
self.host_name = host_name
super().__init__(*args, **kwargs)
self.fields["confirm_host"].help_text = f"Type {host_name} to confirm deletion."
def clean_confirm_host(self) -> str:
value = self.cleaned_data["confirm_host"].strip()
if value != self.host_name:
raise forms.ValidationError(f"Type {self.host_name} to confirm.")
return value
class ScheduleConfigForm(forms.ModelForm):
cron_expr = forms.CharField(
label="Cron expression",
help_text='Five-field cron expression, for example "15 2 * * *".',
)
prune_max_delete = forms.IntegerField(min_value=0)
class Meta:
model = ScheduleConfig
fields = (
"cron_expr",
"user",
"enabled",
"prune",
"prune_max_delete",
"prune_protect_bases",
)
def clean_cron_expr(self) -> str:
cron_expr = self.cleaned_data["cron_expr"].strip()
try:
parse_cron_expr(cron_expr)
except ValueError as exc:
raise forms.ValidationError(str(exc)) from exc
return cron_expr
def normalize_private_key(private_key: str) -> str:
return private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff")
def normalize_public_key(public_key: str) -> str:
return " ".join(public_key.strip().split())
def public_key_identity(public_key: str) -> str:
parts = normalize_public_key(public_key).split()
if len(parts) >= 2:
return " ".join(parts[:2])
return normalize_public_key(public_key)
def validate_ssh_private_key(private_key: str) -> str:
if "BEGIN OPENSSH PRIVATE KEY" not in private_key:
if private_key.strip().startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")):
raise forms.ValidationError("This looks like a public key. Paste the private key in this field.")
raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.")
with TemporaryDirectory() as tmp:
key_path = Path(tmp) / "identity"
key_path.write_text(f"{private_key}\n", encoding="utf-8")
os.chmod(key_path, 0o600)
try:
result = subprocess.run(
["ssh-keygen", "-y", "-f", str(key_path)],
check=False,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5,
)
except FileNotFoundError as exc:
raise forms.ValidationError("ssh-keygen is not available in this container.") from exc
except subprocess.TimeoutExpired as exc:
raise forms.ValidationError("Could not validate SSH private key before timeout.") from exc
if result.returncode != 0:
message = result.stderr.strip() or "OpenSSH could not read this private key."
lower_message = message.lower()
if "passphrase" in lower_message:
message = "Encrypted SSH private keys are not supported for unattended backups."
elif "libcrypto" in lower_message:
message = (
"OpenSSH could not parse this key. It is usually incomplete, corrupted while copying, "
"or not an unencrypted OpenSSH private key."
)
raise forms.ValidationError(f"Invalid SSH private key: {message}")
public_key = result.stdout.strip()
if not public_key:
raise forms.ValidationError("Invalid SSH private key: no public key could be derived.")
return public_key