from __future__ import annotations import os import subprocess from pathlib import Path from tempfile import TemporaryDirectory from django import forms from django.conf import settings from .models import GlobalConfig, HostConfig, ScheduleConfig, SshCredential from .scheduler import parse_cron_expr class NewlineListField(forms.CharField): widget = forms.Textarea def __init__(self, *args, **kwargs) -> None: kwargs.setdefault("required", False) super().__init__(*args, **kwargs) def prepare_value(self, value): if isinstance(value, list): return "\n".join(str(item) for item in value) return value def to_python(self, value) -> list[str]: if not value: return [] if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] return [line.strip() for line in str(value).splitlines() if line.strip()] class NullableNewlineListField(NewlineListField): def to_python(self, value) -> list[str] | None: parsed = super().to_python(value) return parsed or None class HostConfigForm(forms.ModelForm): includes = NewlineListField(help_text="One include path per line. Leave empty to include defaults.") excludes_add = NewlineListField(help_text="One additional exclude pattern per line.") excludes_replace = NullableNewlineListField( help_text="Optional. When set, replaces global excludes; one pattern per line." ) rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.") class Meta: model = HostConfig fields = ( "address", "enabled", "ssh_credential", "ssh_user", "ssh_port", "source_root", "includes", "excludes_add", "excludes_replace", "rsync_extra_args", "retention_daily", "retention_weekly", "retention_monthly", "retention_yearly", ) help_texts = { "ssh_credential": "Optional. Overrides the global SSH credential for this host.", "ssh_user": "Leave empty to use the global SSH user.", "ssh_port": "Leave empty to use the global SSH port.", "source_root": "Leave empty to use the global default source root.", } class CreateHostConfigForm(HostConfigForm): class Meta(HostConfigForm.Meta): fields = ("host", *HostConfigForm.Meta.fields) help_texts = { **HostConfigForm.Meta.help_texts, "host": "Stable internal host name used for backup paths.", } class GlobalConfigForm(forms.ModelForm): ssh_options = NewlineListField(help_text="One SSH option per line.") rsync_args = NewlineListField(help_text="One default rsync argument per line.") rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.") excludes_default = NewlineListField(help_text="One default exclude pattern per line.") class Meta: model = GlobalConfig fields = ( "name", "default_ssh_credential", "ssh_user", "ssh_port", "ssh_options", "rsync_binary", "rsync_args", "rsync_extra_args", "rsync_timeout_seconds", "rsync_bwlimit_kbps", "default_source_root", "default_destination_subdir", "excludes_default", "retention_daily", "retention_weekly", "retention_monthly", "retention_yearly", ) help_texts = { "name": "Usually 'default'. The backup engine currently reads the default config.", "default_ssh_credential": "Optional. Used by hosts without their own SSH credential.", "default_source_root": "Used by hosts without a custom source root.", "default_destination_subdir": "Optional subdirectory below each snapshot.", } def save(self, commit: bool = True): instance = super().save(commit=False) instance.backup_root = settings.POBSYNC_BACKUP_ROOT instance.pobsync_home = settings.POBSYNC_HOME if commit: instance.save() self.save_m2m() return instance class ManualBackupForm(forms.Form): dry_run = forms.BooleanField( label="Dry run", required=False, initial=True, help_text="Queue rsync in dry-run mode without writing a snapshot.", ) prune = forms.BooleanField( label="Apply retention after success", required=False, help_text="Apply retention after a successful non-dry-run backup.", ) prune_max_delete = forms.IntegerField(label="Retention max delete", min_value=0, initial=10) prune_protect_bases = forms.BooleanField( label="Protect base snapshots", required=False, help_text="Keep snapshots that are used as bases by other snapshots.", ) class SshCredentialForm(forms.ModelForm): private_key = forms.CharField( widget=forms.Textarea, help_text="Private key used by the worker container for SSH backups.", ) public_key = forms.CharField(widget=forms.Textarea, required=False) known_hosts = forms.CharField( widget=forms.Textarea, required=False, help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.", ) notes = forms.CharField(widget=forms.Textarea, required=False) class Meta: model = SshCredential fields = ("name", "private_key", "public_key", "known_hosts", "notes") def clean_private_key(self) -> str: private_key = self.cleaned_data["private_key"].strip() public_key = validate_ssh_private_key(private_key) self.derived_public_key = public_key return f"{private_key}\n" def clean(self): cleaned_data = super().clean() if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"): cleaned_data["public_key"] = self.derived_public_key return cleaned_data class RetentionApplyForm(forms.Form): kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All"))) protect_bases = forms.BooleanField(required=False) max_delete = forms.IntegerField(min_value=0, initial=10) confirm_host = forms.CharField() def __init__(self, *args, host_name: str, **kwargs) -> None: self.host_name = host_name super().__init__(*args, **kwargs) self.fields["confirm_host"].help_text = f"Type {host_name} to confirm deletion." def clean_confirm_host(self) -> str: value = self.cleaned_data["confirm_host"].strip() if value != self.host_name: raise forms.ValidationError(f"Type {self.host_name} to confirm.") return value class ScheduleConfigForm(forms.ModelForm): cron_expr = forms.CharField( label="Cron expression", help_text='Five-field cron expression, for example "15 2 * * *".', ) prune_max_delete = forms.IntegerField(min_value=0) class Meta: model = ScheduleConfig fields = ( "cron_expr", "user", "enabled", "prune", "prune_max_delete", "prune_protect_bases", ) def clean_cron_expr(self) -> str: cron_expr = self.cleaned_data["cron_expr"].strip() try: parse_cron_expr(cron_expr) except ValueError as exc: raise forms.ValidationError(str(exc)) from exc return cron_expr def validate_ssh_private_key(private_key: str) -> str: with TemporaryDirectory() as tmp: key_path = Path(tmp) / "identity" key_path.write_text(f"{private_key}\n", encoding="utf-8") os.chmod(key_path, 0o600) try: result = subprocess.run( ["ssh-keygen", "-y", "-f", str(key_path)], check=False, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5, ) except FileNotFoundError as exc: raise forms.ValidationError("ssh-keygen is not available in this container.") from exc except subprocess.TimeoutExpired as exc: raise forms.ValidationError("Could not validate SSH private key before timeout.") from exc if result.returncode != 0: message = result.stderr.strip() or "OpenSSH could not read this private key." if "passphrase" in message.lower(): message = "Encrypted SSH private keys are not supported for unattended backups." raise forms.ValidationError(f"Invalid SSH private key: {message}") public_key = result.stdout.strip() if not public_key: raise forms.ValidationError("Invalid SSH private key: no public key could be derived.") return public_key