from __future__ import annotations import os import textwrap import subprocess from pathlib import Path from tempfile import TemporaryDirectory from django import forms from django.conf import settings from .models import GlobalConfig, HostConfig, ScheduleConfig, SshCredential from .scheduler import parse_cron_expr class NewlineListField(forms.CharField): widget = forms.Textarea def __init__(self, *args, **kwargs) -> None: kwargs.setdefault("required", False) super().__init__(*args, **kwargs) def prepare_value(self, value): if isinstance(value, list): return "\n".join(str(item) for item in value) return value def to_python(self, value) -> list[str]: if not value: return [] if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] return [line.strip() for line in str(value).splitlines() if line.strip()] class NullableNewlineListField(NewlineListField): def to_python(self, value) -> list[str] | None: parsed = super().to_python(value) return parsed or None class HostConfigForm(forms.ModelForm): includes = NewlineListField(help_text="One include path per line. Leave empty to include defaults.") excludes_add = NewlineListField(help_text="One additional exclude pattern per line.") excludes_replace = NullableNewlineListField( help_text="Optional. When set, replaces global excludes; one pattern per line." ) rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.") class Meta: model = HostConfig fields = ( "address", "enabled", "ssh_credential", "ssh_user", "ssh_port", "source_root", "includes", "excludes_add", "excludes_replace", "rsync_extra_args", "retention_daily", "retention_weekly", "retention_monthly", "retention_yearly", ) help_texts = { "ssh_credential": "Optional. Overrides the global SSH credential for this host.", "ssh_user": "Leave empty to use the global SSH user.", "ssh_port": "Leave empty to use the global SSH port.", "source_root": "Leave empty to use the global default source root.", } class CreateHostConfigForm(HostConfigForm): class Meta(HostConfigForm.Meta): fields = ("host", *HostConfigForm.Meta.fields) help_texts = { **HostConfigForm.Meta.help_texts, "host": "Stable internal host name used for backup paths.", } class GlobalConfigForm(forms.ModelForm): ssh_options = NewlineListField(help_text="One SSH option per line.") rsync_args = NewlineListField(help_text="One default rsync argument per line.") rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.") excludes_default = NewlineListField(help_text="One default exclude pattern per line.") class Meta: model = GlobalConfig fields = ( "name", "default_ssh_credential", "ssh_user", "ssh_port", "ssh_options", "rsync_binary", "rsync_args", "rsync_extra_args", "rsync_timeout_seconds", "rsync_bwlimit_kbps", "default_source_root", "default_destination_subdir", "excludes_default", "retention_daily", "retention_weekly", "retention_monthly", "retention_yearly", ) help_texts = { "name": "Usually 'default'. The backup engine currently reads the default config.", "default_ssh_credential": "Optional. Used by hosts without their own SSH credential.", "default_source_root": "Used by hosts without a custom source root.", "default_destination_subdir": "Optional subdirectory below each snapshot.", } def save(self, commit: bool = True): instance = super().save(commit=False) instance.backup_root = settings.POBSYNC_BACKUP_ROOT instance.pobsync_home = settings.POBSYNC_HOME if commit: instance.save() self.save_m2m() return instance class ManualBackupForm(forms.Form): dry_run = forms.BooleanField( label="Dry run", required=False, initial=True, help_text="Queue rsync in dry-run mode without writing a snapshot.", ) verbose_output = forms.BooleanField( label="Verbose rsync output", required=False, help_text="Write itemized rsync changes, file-list progress, and stats to the run log. Dry-runs always use this.", ) prune = forms.BooleanField( label="Apply retention after success", required=False, help_text="Apply retention after a successful non-dry-run backup.", ) prune_max_delete = forms.IntegerField(label="Retention max delete", min_value=0, initial=10) prune_protect_bases = forms.BooleanField( label="Protect base snapshots", required=False, help_text="Keep snapshots that are used as bases by other snapshots.", ) class SshCredentialForm(forms.ModelForm): private_key_file = forms.FileField( required=False, help_text="Optional. Upload the private key file directly to avoid copy/paste formatting problems.", ) private_key = forms.CharField( widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), required=False, help_text=( "Paste the complete unencrypted OpenSSH private key, including BEGIN/END lines. " "Leave empty when uploading a private key file." ), ) public_key = forms.CharField( widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), required=False, help_text="Optional. If set, pobsync verifies it matches the private key.", ) known_hosts = forms.CharField( widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), required=False, help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.", ) notes = forms.CharField(widget=forms.Textarea, required=False) class Meta: model = SshCredential fields = ("name", "private_key", "public_key", "known_hosts", "notes") def clean_private_key(self) -> str: uploaded_file = self.files.get("private_key_file") if uploaded_file: try: raw_private_key = uploaded_file.read().decode("utf-8") except UnicodeDecodeError as exc: raise forms.ValidationError("SSH private key files must be UTF-8 text files.") from exc else: raw_private_key = self.cleaned_data.get("private_key", "") if not raw_private_key.strip(): if self.instance and self.instance.pk and self.instance.key_path: return self.instance.private_key raise forms.ValidationError("Paste a private key, upload a private key file, or generate a key from Django.") private_key = normalize_private_key(raw_private_key) public_key = validate_ssh_private_key(private_key) self.derived_public_key = public_key return f"{private_key}\n" def clean(self): cleaned_data = super().clean() provided_public_key = normalize_public_key(cleaned_data.get("public_key", "")) if provided_public_key: cleaned_data["public_key"] = provided_public_key elif self.instance and self.instance.pk and self.instance.key_path: cleaned_data["public_key"] = self.instance.public_key if cleaned_data.get("private_key") and provided_public_key and hasattr(self, "derived_public_key"): if public_key_identity(provided_public_key) != public_key_identity(self.derived_public_key): self.add_error( "public_key", forms.ValidationError("Public key does not match the supplied private key."), ) elif cleaned_data.get("private_key") and not provided_public_key and hasattr(self, "derived_public_key"): cleaned_data["public_key"] = self.derived_public_key return cleaned_data class SshCredentialGenerateForm(forms.Form): name = forms.CharField(max_length=128) key_type = forms.ChoiceField( choices=(("ed25519", "ed25519"), ("rsa", "rsa")), initial="ed25519", help_text="ed25519 is recommended unless you need RSA for an older target.", ) set_global_default = forms.BooleanField( required=False, initial=True, help_text="Use this key as the global default when the default global config exists.", ) known_hosts = forms.CharField( widget=forms.Textarea(attrs={"spellcheck": "false", "autocomplete": "off"}), required=False, help_text="Optional known_hosts entries. This can also be filled later.", ) notes = forms.CharField(widget=forms.Textarea, required=False) def clean_name(self) -> str: name = self.cleaned_data["name"].strip() if SshCredential.objects.filter(name=name).exists(): raise forms.ValidationError("An SSH credential with this name already exists.") return name class RetentionApplyForm(forms.Form): kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All"))) protect_bases = forms.BooleanField(required=False) max_delete = forms.IntegerField(min_value=0, initial=10) confirm_delete_count = forms.IntegerField(min_value=0) confirm_host = forms.CharField() def __init__(self, *args, host_name: str, expected_delete_count: int | None = None, **kwargs) -> None: self.host_name = host_name self.expected_delete_count = expected_delete_count super().__init__(*args, **kwargs) self.fields["confirm_host"].help_text = f"Type {host_name} to confirm deletion." if expected_delete_count is not None: self.fields["confirm_delete_count"].help_text = ( f"Type {expected_delete_count} to confirm the current number of planned deletions." ) def clean_confirm_host(self) -> str: value = self.cleaned_data["confirm_host"].strip() if value != self.host_name: raise forms.ValidationError(f"Type {self.host_name} to confirm.") return value def clean_confirm_delete_count(self) -> int: value = self.cleaned_data["confirm_delete_count"] if self.expected_delete_count is not None and value != self.expected_delete_count: raise forms.ValidationError(f"Type {self.expected_delete_count} to confirm the delete count.") return value class ScheduleConfigForm(forms.ModelForm): cron_expr = forms.CharField( label="Schedule expression", help_text=( 'Five-field cron-style expression, for example "15 2 * * *". ' "This is evaluated by the pobsync scheduler service, not host cron." ), ) prune_max_delete = forms.IntegerField(min_value=0) class Meta: model = ScheduleConfig fields = ( "cron_expr", "enabled", "prune", "prune_max_delete", "prune_protect_bases", ) def clean_cron_expr(self) -> str: cron_expr = self.cleaned_data["cron_expr"].strip() try: parse_cron_expr(cron_expr) except ValueError as exc: raise forms.ValidationError(str(exc)) from exc return cron_expr def normalize_private_key(private_key: str) -> str: normalized = private_key.replace("\r\n", "\n").replace("\r", "\n").strip().lstrip("\ufeff") begin_marker = "-----BEGIN OPENSSH PRIVATE KEY-----" end_marker = "-----END OPENSSH PRIVATE KEY-----" if begin_marker in normalized and end_marker in normalized: before_body, after_begin = normalized.split(begin_marker, 1) body, after_end = after_begin.split(end_marker, 1) if before_body.strip() or after_end.strip(): return normalized compact_body = "".join(body.split()) wrapped_body = "\n".join(textwrap.wrap(compact_body, width=70)) return f"{begin_marker}\n{wrapped_body}\n{end_marker}" return normalized def normalize_public_key(public_key: str) -> str: return " ".join(public_key.strip().split()) def public_key_identity(public_key: str) -> str: parts = normalize_public_key(public_key).split() if len(parts) >= 2: return " ".join(parts[:2]) return normalize_public_key(public_key) def validate_ssh_private_key(private_key: str) -> str: if "BEGIN OPENSSH PRIVATE KEY" not in private_key: stripped = private_key.strip() if stripped.startswith(("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-")): raise forms.ValidationError("This looks like a public key. Paste the private key in this field.") if "BEGIN RSA PRIVATE KEY" in stripped or "BEGIN EC PRIVATE KEY" in stripped: raise forms.ValidationError( "PEM private keys are not supported here yet. Convert it to an unencrypted OpenSSH key first." ) raise forms.ValidationError("Invalid SSH private key: missing OpenSSH private key header.") with TemporaryDirectory() as tmp: key_path = Path(tmp) / "identity" key_path.write_text(f"{private_key}\n", encoding="utf-8") os.chmod(key_path, 0o600) try: result = subprocess.run( ["ssh-keygen", "-y", "-f", str(key_path)], check=False, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5, ) except FileNotFoundError as exc: raise forms.ValidationError("ssh-keygen is not available in this container.") from exc except subprocess.TimeoutExpired as exc: raise forms.ValidationError("Could not validate SSH private key before timeout.") from exc if result.returncode != 0: message = result.stderr.strip() or "OpenSSH could not read this private key." lower_message = message.lower() if "passphrase" in lower_message: message = "Encrypted SSH private keys are not supported for unattended backups." elif "libcrypto" in lower_message: message = ( "OpenSSH could not parse this key. It is usually incomplete, corrupted while copying, " "or not an unencrypted OpenSSH private key." ) raise forms.ValidationError(f"Invalid SSH private key: {message}") public_key = result.stdout.strip() if not public_key: raise forms.ValidationError("Invalid SSH private key: no public key could be derived.") return public_key