Files
pobsync/src/pobsync_backend/ssh_keys.py

111 lines
3.4 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import shutil
import subprocess
from pathlib import Path
from django.conf import settings
from .models import SshCredential
class SshKeyError(RuntimeError):
pass
def credential_dir(credential: SshCredential) -> Path:
return Path(settings.POBSYNC_HOME) / "state" / "ssh-credentials" / str(credential.pk)
def identity_path(credential: SshCredential) -> Path:
if credential.key_path:
return Path(credential.key_path)
return credential_dir(credential) / "identity"
def generate_ssh_key(credential: SshCredential, *, key_type: str = "ed25519", force: bool = False) -> SshCredential:
if credential.pk is None:
raise SshKeyError("Credential must be saved before generating an SSH key.")
if shutil.which("ssh-keygen") is None:
raise SshKeyError("ssh-keygen is not available.")
key_dir = credential_dir(credential)
key_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
os.chmod(key_dir, 0o700)
private_key = key_dir / "identity"
public_key_file = key_dir / "identity.pub"
if force:
private_key.unlink(missing_ok=True)
public_key_file.unlink(missing_ok=True)
elif private_key.exists() or public_key_file.exists():
raise SshKeyError(f"SSH key already exists for {credential.name}.")
result = subprocess.run(
[
"ssh-keygen",
"-t",
key_type,
"-N",
"",
"-C",
f"pobsync:{credential.name}",
"-f",
str(private_key),
],
check=False,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=15,
)
if result.returncode != 0:
raise SshKeyError(result.stderr.strip() or "ssh-keygen failed.")
os.chmod(private_key, 0o600)
public_key = public_key_file.read_text(encoding="utf-8").strip()
fingerprint = fingerprint_for_key(private_key)
credential.private_key = ""
credential.public_key = public_key
credential.key_path = str(private_key)
credential.key_type = key_type
credential.fingerprint = fingerprint
credential.generated = True
credential.save(update_fields=["private_key", "public_key", "key_path", "key_type", "fingerprint", "generated", "updated_at"])
return credential
def fingerprint_for_key(private_key: Path) -> str:
result = subprocess.run(
["ssh-keygen", "-lf", str(private_key)],
check=False,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5,
)
if result.returncode != 0:
raise SshKeyError(result.stderr.strip() or "Could not fingerprint SSH key.")
return result.stdout.strip()
def delete_generated_key_files(credential: SshCredential) -> None:
path = identity_path(credential)
allowed_root = (Path(settings.POBSYNC_HOME) / "state" / "ssh-credentials").resolve()
try:
resolved = path.resolve()
except FileNotFoundError:
resolved = path
if allowed_root not in resolved.parents:
raise SshKeyError(f"Refusing to delete key outside {allowed_root}.")
path.unlink(missing_ok=True)
path.with_suffix(path.suffix + ".pub").unlink(missing_ok=True)
if path.name == "identity":
(path.parent / "identity.pub").unlink(missing_ok=True)