2026-05-19 04:57:10 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-05-19 14:37:38 +02:00
|
|
|
import os
|
|
|
|
|
from pathlib import Path
|
2026-05-19 04:57:10 +02:00
|
|
|
from typing import Any
|
|
|
|
|
|
2026-05-19 14:37:38 +02:00
|
|
|
from django.conf import settings
|
|
|
|
|
|
2026-05-19 04:57:10 +02:00
|
|
|
from pobsync.config.merge import build_effective_config
|
2026-05-19 14:37:38 +02:00
|
|
|
from pobsync.paths import PobsyncPaths
|
2026-05-19 04:57:10 +02:00
|
|
|
|
|
|
|
|
from .config_repository import global_config_data, host_config_data
|
2026-05-19 14:37:38 +02:00
|
|
|
from .models import GlobalConfig, HostConfig, SshCredential
|
2026-05-19 19:41:40 +02:00
|
|
|
from .ssh_keys import identity_path
|
2026-05-19 04:57:10 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class DjangoConfigSource:
|
|
|
|
|
def effective_config_for_host(self, host: str) -> dict[str, Any]:
|
2026-05-19 14:37:38 +02:00
|
|
|
config = build_effective_config(global_config_data(), host_config_data(host))
|
|
|
|
|
credential = _credential_for_host(host)
|
|
|
|
|
if credential is not None:
|
|
|
|
|
_attach_credential_options(config, credential)
|
|
|
|
|
return config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _credential_for_host(host: str) -> SshCredential | None:
|
|
|
|
|
host_config = HostConfig.objects.select_related("ssh_credential").get(host=host, enabled=True)
|
|
|
|
|
if host_config.ssh_credential_id:
|
|
|
|
|
return host_config.ssh_credential
|
|
|
|
|
|
|
|
|
|
global_config = GlobalConfig.objects.select_related("default_ssh_credential").get(name="default")
|
|
|
|
|
return global_config.default_ssh_credential
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _attach_credential_options(config: dict[str, Any], credential: SshCredential) -> None:
|
|
|
|
|
ssh = config.setdefault("ssh", {})
|
|
|
|
|
options = list(ssh.get("options") or [])
|
|
|
|
|
paths = _materialize_credential(credential)
|
|
|
|
|
if not _has_ssh_option(options, "IdentityFile"):
|
|
|
|
|
options.append(f"-oIdentityFile={paths['identity_file']}")
|
|
|
|
|
if paths.get("known_hosts") and not _has_ssh_option(options, "UserKnownHostsFile"):
|
|
|
|
|
options.append(f"-oUserKnownHostsFile={paths['known_hosts']}")
|
2026-05-19 20:01:39 +02:00
|
|
|
if paths.get("accept_new_known_hosts"):
|
|
|
|
|
if not _has_ssh_option(options, "UserKnownHostsFile"):
|
|
|
|
|
options.append(f"-oUserKnownHostsFile={paths['accept_new_known_hosts']}")
|
|
|
|
|
if not _has_ssh_option(options, "StrictHostKeyChecking"):
|
|
|
|
|
options.append("-oStrictHostKeyChecking=accept-new")
|
2026-05-19 14:37:38 +02:00
|
|
|
ssh["options"] = options
|
2026-05-19 19:49:33 +02:00
|
|
|
config["ssh_credential"] = {
|
|
|
|
|
"id": credential.pk,
|
|
|
|
|
"name": credential.name,
|
|
|
|
|
"identity_file": paths["identity_file"],
|
|
|
|
|
"generated": credential.generated,
|
|
|
|
|
"storage": "filesystem" if credential.key_path else "database",
|
|
|
|
|
}
|
2026-05-19 14:37:38 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _materialize_credential(credential: SshCredential) -> dict[str, str]:
|
|
|
|
|
paths = PobsyncPaths(home=Path(settings.POBSYNC_HOME))
|
|
|
|
|
credential_dir = paths.state_dir / "ssh-credentials" / str(credential.pk)
|
|
|
|
|
credential_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
|
|
|
|
|
os.chmod(credential_dir, 0o700)
|
|
|
|
|
|
2026-05-19 19:41:40 +02:00
|
|
|
identity_file = identity_path(credential)
|
|
|
|
|
if credential.key_path:
|
|
|
|
|
os.chmod(identity_file, 0o600)
|
|
|
|
|
else:
|
|
|
|
|
identity_file.write_text(_with_trailing_newline(credential.private_key), encoding="utf-8")
|
|
|
|
|
os.chmod(identity_file, 0o600)
|
2026-05-19 14:37:38 +02:00
|
|
|
|
|
|
|
|
result = {"identity_file": str(identity_file)}
|
|
|
|
|
if credential.known_hosts.strip():
|
|
|
|
|
known_hosts = credential_dir / "known_hosts"
|
|
|
|
|
known_hosts.write_text(_with_trailing_newline(credential.known_hosts), encoding="utf-8")
|
|
|
|
|
os.chmod(known_hosts, 0o600)
|
|
|
|
|
result["known_hosts"] = str(known_hosts)
|
2026-05-19 20:01:39 +02:00
|
|
|
else:
|
|
|
|
|
known_hosts = paths.state_dir / "known_hosts"
|
|
|
|
|
known_hosts.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
|
|
|
|
|
known_hosts.touch(mode=0o600, exist_ok=True)
|
|
|
|
|
os.chmod(known_hosts, 0o600)
|
|
|
|
|
result["accept_new_known_hosts"] = str(known_hosts)
|
2026-05-19 14:37:38 +02:00
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _has_ssh_option(options: list[str], name: str) -> bool:
|
|
|
|
|
prefix = f"-o{name}="
|
|
|
|
|
spaced = f"-o{name} "
|
|
|
|
|
return any(option == name or option.startswith(prefix) or option.startswith(spaced) for option in options)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _with_trailing_newline(value: str) -> str:
|
|
|
|
|
return value if value.endswith("\n") else f"{value}\n"
|