Files
pobsync/src/pobsync_backend/forms.py

253 lines
8.9 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from django import forms
from django.conf import settings
from .models import GlobalConfig, HostConfig, ScheduleConfig, SshCredential
from .scheduler import parse_cron_expr
class NewlineListField(forms.CharField):
widget = forms.Textarea
def __init__(self, *args, **kwargs) -> None:
kwargs.setdefault("required", False)
super().__init__(*args, **kwargs)
def prepare_value(self, value):
if isinstance(value, list):
return "\n".join(str(item) for item in value)
return value
def to_python(self, value) -> list[str]:
if not value:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
return [line.strip() for line in str(value).splitlines() if line.strip()]
class NullableNewlineListField(NewlineListField):
def to_python(self, value) -> list[str] | None:
parsed = super().to_python(value)
return parsed or None
class HostConfigForm(forms.ModelForm):
includes = NewlineListField(help_text="One include path per line. Leave empty to include defaults.")
excludes_add = NewlineListField(help_text="One additional exclude pattern per line.")
excludes_replace = NullableNewlineListField(
help_text="Optional. When set, replaces global excludes; one pattern per line."
)
rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.")
class Meta:
model = HostConfig
fields = (
"address",
"enabled",
"ssh_credential",
"ssh_user",
"ssh_port",
"source_root",
"includes",
"excludes_add",
"excludes_replace",
"rsync_extra_args",
"retention_daily",
"retention_weekly",
"retention_monthly",
"retention_yearly",
)
help_texts = {
"ssh_credential": "Optional. Overrides the global SSH credential for this host.",
"ssh_user": "Leave empty to use the global SSH user.",
"ssh_port": "Leave empty to use the global SSH port.",
"source_root": "Leave empty to use the global default source root.",
}
class CreateHostConfigForm(HostConfigForm):
class Meta(HostConfigForm.Meta):
fields = ("host", *HostConfigForm.Meta.fields)
help_texts = {
**HostConfigForm.Meta.help_texts,
"host": "Stable internal host name used for backup paths.",
}
class GlobalConfigForm(forms.ModelForm):
ssh_options = NewlineListField(help_text="One SSH option per line.")
rsync_args = NewlineListField(help_text="One default rsync argument per line.")
rsync_extra_args = NewlineListField(help_text="One extra rsync argument per line.")
excludes_default = NewlineListField(help_text="One default exclude pattern per line.")
class Meta:
model = GlobalConfig
fields = (
"name",
"default_ssh_credential",
"ssh_user",
"ssh_port",
"ssh_options",
"rsync_binary",
"rsync_args",
"rsync_extra_args",
"rsync_timeout_seconds",
"rsync_bwlimit_kbps",
"default_source_root",
"default_destination_subdir",
"excludes_default",
"retention_daily",
"retention_weekly",
"retention_monthly",
"retention_yearly",
)
help_texts = {
"name": "Usually 'default'. The backup engine currently reads the default config.",
"default_ssh_credential": "Optional. Used by hosts without their own SSH credential.",
"default_source_root": "Used by hosts without a custom source root.",
"default_destination_subdir": "Optional subdirectory below each snapshot.",
}
def save(self, commit: bool = True):
instance = super().save(commit=False)
instance.backup_root = settings.POBSYNC_BACKUP_ROOT
instance.pobsync_home = settings.POBSYNC_HOME
if commit:
instance.save()
self.save_m2m()
return instance
class ManualBackupForm(forms.Form):
dry_run = forms.BooleanField(
label="Dry run",
required=False,
initial=True,
help_text="Queue rsync in dry-run mode without writing a snapshot.",
)
prune = forms.BooleanField(
label="Apply retention after success",
required=False,
help_text="Apply retention after a successful non-dry-run backup.",
)
prune_max_delete = forms.IntegerField(label="Retention max delete", min_value=0, initial=10)
prune_protect_bases = forms.BooleanField(
label="Protect base snapshots",
required=False,
help_text="Keep snapshots that are used as bases by other snapshots.",
)
class SshCredentialForm(forms.ModelForm):
private_key = forms.CharField(
widget=forms.Textarea,
help_text="Private key used by the worker container for SSH backups.",
)
public_key = forms.CharField(widget=forms.Textarea, required=False)
known_hosts = forms.CharField(
widget=forms.Textarea,
required=False,
help_text="Optional known_hosts entries. When set, StrictHostKeyChecking can stay enabled.",
)
notes = forms.CharField(widget=forms.Textarea, required=False)
class Meta:
model = SshCredential
fields = ("name", "private_key", "public_key", "known_hosts", "notes")
def clean_private_key(self) -> str:
private_key = self.cleaned_data["private_key"].strip()
public_key = validate_ssh_private_key(private_key)
self.derived_public_key = public_key
return f"{private_key}\n"
def clean(self):
cleaned_data = super().clean()
if cleaned_data.get("private_key") and not cleaned_data.get("public_key") and hasattr(self, "derived_public_key"):
cleaned_data["public_key"] = self.derived_public_key
return cleaned_data
class RetentionApplyForm(forms.Form):
kind = forms.ChoiceField(choices=(("scheduled", "Scheduled"), ("manual", "Manual"), ("all", "All")))
protect_bases = forms.BooleanField(required=False)
max_delete = forms.IntegerField(min_value=0, initial=10)
confirm_host = forms.CharField()
def __init__(self, *args, host_name: str, **kwargs) -> None:
self.host_name = host_name
super().__init__(*args, **kwargs)
self.fields["confirm_host"].help_text = f"Type {host_name} to confirm deletion."
def clean_confirm_host(self) -> str:
value = self.cleaned_data["confirm_host"].strip()
if value != self.host_name:
raise forms.ValidationError(f"Type {self.host_name} to confirm.")
return value
class ScheduleConfigForm(forms.ModelForm):
cron_expr = forms.CharField(
label="Cron expression",
help_text='Five-field cron expression, for example "15 2 * * *".',
)
prune_max_delete = forms.IntegerField(min_value=0)
class Meta:
model = ScheduleConfig
fields = (
"cron_expr",
"user",
"enabled",
"prune",
"prune_max_delete",
"prune_protect_bases",
)
def clean_cron_expr(self) -> str:
cron_expr = self.cleaned_data["cron_expr"].strip()
try:
parse_cron_expr(cron_expr)
except ValueError as exc:
raise forms.ValidationError(str(exc)) from exc
return cron_expr
def validate_ssh_private_key(private_key: str) -> str:
with TemporaryDirectory() as tmp:
key_path = Path(tmp) / "identity"
key_path.write_text(f"{private_key}\n", encoding="utf-8")
os.chmod(key_path, 0o600)
try:
result = subprocess.run(
["ssh-keygen", "-y", "-f", str(key_path)],
check=False,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5,
)
except FileNotFoundError as exc:
raise forms.ValidationError("ssh-keygen is not available in this container.") from exc
except subprocess.TimeoutExpired as exc:
raise forms.ValidationError("Could not validate SSH private key before timeout.") from exc
if result.returncode != 0:
message = result.stderr.strip() or "OpenSSH could not read this private key."
if "passphrase" in message.lower():
message = "Encrypted SSH private keys are not supported for unattended backups."
raise forms.ValidationError(f"Invalid SSH private key: {message}")
public_key = result.stdout.strip()
if not public_key:
raise forms.ValidationError("Invalid SSH private key: no public key could be derived.")
return public_key