(feature) Add host key scanning for SSH credentials

Add a host detail action that scans the target SSH host key with
ssh-keyscan and stores it on the selected SSH credential.

Merge scanned known_hosts entries without duplicates and let the
existing runtime config pass them through as UserKnownHostsFile for
unattended rsync over SSH.

Extend host checks to warn when the selected credential has no known_hosts
entries, making host key verification failures actionable from Django.
This commit is contained in:
2026-05-19 19:55:40 +02:00
parent 25d2a5b1a7
commit d3ffca1843
7 changed files with 104 additions and 1 deletions

View File

@@ -62,6 +62,17 @@ def collect_host_checks(host: HostConfig, global_config: GlobalConfig | None = N
"Generated filesystem keys are recommended for native systemd installs.",
)
)
if credential.known_hosts.strip():
checks.append(SelfCheck("Host known_hosts", "ok", "Selected credential has known_hosts entries."))
else:
checks.append(
SelfCheck(
"Host known_hosts",
"warning",
"Selected credential has no known_hosts entries.",
"Use Scan SSH host key before queueing unattended backups.",
)
)
host_root = resolve_host_root(global_config.backup_root, host.host)
checks.append(_host_path_check("Host backup root", host_root, must_exist=True, must_be_writable=True))

View File

@@ -108,3 +108,38 @@ def delete_generated_key_files(credential: SshCredential) -> None:
path.with_suffix(path.suffix + ".pub").unlink(missing_ok=True)
if path.name == "identity":
(path.parent / "identity.pub").unlink(missing_ok=True)
def scan_known_host(address: str, *, port: int = 22, timeout: int = 5) -> str:
if shutil.which("ssh-keyscan") is None:
raise SshKeyError("ssh-keyscan is not available.")
command = ["ssh-keyscan", "-T", str(timeout), "-p", str(port), address]
result = subprocess.run(
command,
check=False,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout + 2,
)
if result.returncode != 0 and not result.stdout.strip():
raise SshKeyError(result.stderr.strip() or f"Could not scan SSH host key for {address}.")
lines = [line.strip() for line in result.stdout.splitlines() if line.strip() and not line.startswith("#")]
if not lines:
raise SshKeyError(f"ssh-keyscan returned no host keys for {address}.")
return "\n".join(lines)
def merge_known_hosts(existing: str, scanned: str) -> str:
lines: list[str] = []
seen: set[str] = set()
for line in [*existing.splitlines(), *scanned.splitlines()]:
normalized = line.strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
lines.append(normalized)
return "\n".join(lines) + ("\n" if lines else "")

View File

@@ -17,6 +17,10 @@
{% csrf_token %}
<button type="submit" class="secondary">Prepare directories</button>
</form>
<form method="post" action="{% url 'scan_host_known_key' host.host %}">
{% csrf_token %}
<button type="submit" class="secondary">Scan SSH host key</button>
</form>
</section>
<section class="grid" aria-label="Host summary">

View File

@@ -10,6 +10,7 @@ from django.test import SimpleTestCase, TestCase, override_settings
from pobsync_backend.forms import normalize_private_key, validate_ssh_private_key
from pobsync_backend.models import GlobalConfig, SshCredential
from pobsync_backend.ssh_keys import merge_known_hosts
class SshCredentialValidationTests(SimpleTestCase):
@@ -60,3 +61,14 @@ class SshCredentialManagementTests(TestCase):
global_config.refresh_from_db()
self.assertEqual(global_config.default_ssh_credential.name, "default")
def test_merge_known_hosts_appends_unique_entries(self) -> None:
merged = merge_known_hosts(
"web-01.example.test ssh-ed25519 AAAAOLD\n",
"web-01.example.test ssh-ed25519 AAAAOLD\nweb-01.example.test ssh-rsa AAAANEW\n",
)
self.assertEqual(
merged,
"web-01.example.test ssh-ed25519 AAAAOLD\nweb-01.example.test ssh-rsa AAAANEW\n",
)

View File

@@ -522,6 +522,24 @@ class ViewTests(TestCase):
self.assertTrue((backup_root / host.host / "manual").is_dir())
self.assertTrue((backup_root / host.host / ".incomplete").is_dir())
def test_scan_host_known_key_action_updates_selected_credential(self) -> None:
self.client.force_login(self.staff_user)
credential = SshCredential.objects.create(name="default-key", key_path="/var/lib/pobsync/state/ssh-credentials/1/identity")
GlobalConfig.objects.create(name="default", backup_root="/backups", default_ssh_credential=credential, ssh_port=2222)
host = HostConfig.objects.create(host="web-01", address="web-01.example.test")
with patch(
"pobsync_backend.views.scan_known_host",
return_value="web-01.example.test ssh-ed25519 AAAASCANNED",
) as scan:
response = self.client.post(reverse("scan_host_known_key", args=[host.host]), follow=True)
self.assertRedirects(response, reverse("host_detail", args=[host.host]))
self.assertContains(response, "Stored SSH host key for web-01")
scan.assert_called_once_with("web-01.example.test", port=2222)
credential.refresh_from_db()
self.assertEqual(credential.known_hosts, "web-01.example.test ssh-ed25519 AAAASCANNED\n")
def test_host_detail_surfaces_active_backup_run(self) -> None:
self.client.force_login(self.staff_user)
GlobalConfig.objects.create(name="default", backup_root="/backups")

View File

@@ -30,7 +30,7 @@ from .models import BackupRun, GlobalConfig, HostConfig, ScheduleConfig, Snapsho
from .retention import run_sql_retention_apply, run_sql_retention_plan
from .self_check import collect_self_checks, summarize_self_checks
from .snapshot_discovery import discover_snapshots, inspect_snapshot_discovery
from .ssh_keys import SshKeyError, delete_generated_key_files, generate_ssh_key
from .ssh_keys import SshKeyError, delete_generated_key_files, generate_ssh_key, merge_known_hosts, scan_known_host
@staff_member_required
@@ -289,6 +289,28 @@ def prepare_host_directories(request, host: str):
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def scan_host_known_key(request, host: str):
host_config = get_object_or_404(HostConfig, host=host)
global_config = GlobalConfig.objects.filter(name="default").first()
credential = host_config.ssh_credential or (global_config.default_ssh_credential if global_config else None)
if credential is None:
messages.error(request, f"No SSH credential is selected for {host_config.host}.")
return redirect("host_detail", host=host_config.host)
port = host_config.ssh_port or (global_config.ssh_port if global_config else 22)
try:
scanned = scan_known_host(host_config.address, port=int(port or 22))
except SshKeyError as exc:
messages.error(request, f"Could not scan SSH host key for {host_config.host}: {exc}")
else:
credential.known_hosts = merge_known_hosts(credential.known_hosts, scanned)
credential.save(update_fields=["known_hosts", "updated_at"])
messages.success(request, f"Stored SSH host key for {host_config.host} on credential {credential.name}.")
return redirect("host_detail", host=host_config.host)
@staff_member_required
@require_POST
def queue_manual_backup(request, host: str):