(feature) Add Django-managed SSH credentials
Add SSH credentials as first-class Django data so backup keys can be uploaded through the control panel instead of mounted into containers. Credentials can be selected globally or overridden per host. At runtime the selected key is materialized inside the container with restrictive file permissions and injected into the rsync SSH command via IdentityFile. Known hosts entries are handled the same way when configured. Add control panel views for creating and listing SSH keys, expose the fields in config forms and admin, document the workflow, and cover global and host credential selection with tests.
This commit is contained in:
@@ -1,12 +1,71 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from pobsync.config.merge import build_effective_config
|
||||
from pobsync.paths import PobsyncPaths
|
||||
|
||||
from .config_repository import global_config_data, host_config_data
|
||||
from .models import GlobalConfig, HostConfig, SshCredential
|
||||
|
||||
|
||||
class DjangoConfigSource:
|
||||
def effective_config_for_host(self, host: str) -> dict[str, Any]:
|
||||
return build_effective_config(global_config_data(), host_config_data(host))
|
||||
config = build_effective_config(global_config_data(), host_config_data(host))
|
||||
credential = _credential_for_host(host)
|
||||
if credential is not None:
|
||||
_attach_credential_options(config, credential)
|
||||
return config
|
||||
|
||||
|
||||
def _credential_for_host(host: str) -> SshCredential | None:
|
||||
host_config = HostConfig.objects.select_related("ssh_credential").get(host=host, enabled=True)
|
||||
if host_config.ssh_credential_id:
|
||||
return host_config.ssh_credential
|
||||
|
||||
global_config = GlobalConfig.objects.select_related("default_ssh_credential").get(name="default")
|
||||
return global_config.default_ssh_credential
|
||||
|
||||
|
||||
def _attach_credential_options(config: dict[str, Any], credential: SshCredential) -> None:
|
||||
ssh = config.setdefault("ssh", {})
|
||||
options = list(ssh.get("options") or [])
|
||||
paths = _materialize_credential(credential)
|
||||
if not _has_ssh_option(options, "IdentityFile"):
|
||||
options.append(f"-oIdentityFile={paths['identity_file']}")
|
||||
if paths.get("known_hosts") and not _has_ssh_option(options, "UserKnownHostsFile"):
|
||||
options.append(f"-oUserKnownHostsFile={paths['known_hosts']}")
|
||||
ssh["options"] = options
|
||||
|
||||
|
||||
def _materialize_credential(credential: SshCredential) -> dict[str, str]:
|
||||
paths = PobsyncPaths(home=Path(settings.POBSYNC_HOME))
|
||||
credential_dir = paths.state_dir / "ssh-credentials" / str(credential.pk)
|
||||
credential_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
|
||||
os.chmod(credential_dir, 0o700)
|
||||
|
||||
identity_file = credential_dir / "identity"
|
||||
identity_file.write_text(_with_trailing_newline(credential.private_key), encoding="utf-8")
|
||||
os.chmod(identity_file, 0o600)
|
||||
|
||||
result = {"identity_file": str(identity_file)}
|
||||
if credential.known_hosts.strip():
|
||||
known_hosts = credential_dir / "known_hosts"
|
||||
known_hosts.write_text(_with_trailing_newline(credential.known_hosts), encoding="utf-8")
|
||||
os.chmod(known_hosts, 0o600)
|
||||
result["known_hosts"] = str(known_hosts)
|
||||
return result
|
||||
|
||||
|
||||
def _has_ssh_option(options: list[str], name: str) -> bool:
|
||||
prefix = f"-o{name}="
|
||||
spaced = f"-o{name} "
|
||||
return any(option == name or option.startswith(prefix) or option.startswith(spaced) for option in options)
|
||||
|
||||
|
||||
def _with_trailing_newline(value: str) -> str:
|
||||
return value if value.endswith("\n") else f"{value}\n"
|
||||
|
||||
Reference in New Issue
Block a user