Files
pobsync/src/pobsync_backend/config_source.py

76 lines
2.8 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
from pathlib import Path
from typing import Any
from django.conf import settings
from pobsync.config.merge import build_effective_config
from pobsync.paths import PobsyncPaths
from .config_repository import global_config_data, host_config_data
from .models import GlobalConfig, HostConfig, SshCredential
from .ssh_keys import identity_path
class DjangoConfigSource:
def effective_config_for_host(self, host: str) -> dict[str, Any]:
config = build_effective_config(global_config_data(), host_config_data(host))
credential = _credential_for_host(host)
if credential is not None:
_attach_credential_options(config, credential)
return config
def _credential_for_host(host: str) -> SshCredential | None:
host_config = HostConfig.objects.select_related("ssh_credential").get(host=host, enabled=True)
if host_config.ssh_credential_id:
return host_config.ssh_credential
global_config = GlobalConfig.objects.select_related("default_ssh_credential").get(name="default")
return global_config.default_ssh_credential
def _attach_credential_options(config: dict[str, Any], credential: SshCredential) -> None:
ssh = config.setdefault("ssh", {})
options = list(ssh.get("options") or [])
paths = _materialize_credential(credential)
if not _has_ssh_option(options, "IdentityFile"):
options.append(f"-oIdentityFile={paths['identity_file']}")
if paths.get("known_hosts") and not _has_ssh_option(options, "UserKnownHostsFile"):
options.append(f"-oUserKnownHostsFile={paths['known_hosts']}")
ssh["options"] = options
def _materialize_credential(credential: SshCredential) -> dict[str, str]:
paths = PobsyncPaths(home=Path(settings.POBSYNC_HOME))
credential_dir = paths.state_dir / "ssh-credentials" / str(credential.pk)
credential_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
os.chmod(credential_dir, 0o700)
identity_file = identity_path(credential)
if credential.key_path:
os.chmod(identity_file, 0o600)
else:
identity_file.write_text(_with_trailing_newline(credential.private_key), encoding="utf-8")
os.chmod(identity_file, 0o600)
result = {"identity_file": str(identity_file)}
if credential.known_hosts.strip():
known_hosts = credential_dir / "known_hosts"
known_hosts.write_text(_with_trailing_newline(credential.known_hosts), encoding="utf-8")
os.chmod(known_hosts, 0o600)
result["known_hosts"] = str(known_hosts)
return result
def _has_ssh_option(options: list[str], name: str) -> bool:
prefix = f"-o{name}="
spaced = f"-o{name} "
return any(option == name or option.startswith(prefix) or option.startswith(spaced) for option in options)
def _with_trailing_newline(value: str) -> str:
return value if value.endswith("\n") else f"{value}\n"