From 8bd2a8ff1a26f1b853b74b178cbfe94b41b4d603 Mon Sep 17 00:00:00 2001 From: Peter van Arkel Date: Tue, 19 May 2026 20:01:39 +0200 Subject: [PATCH] (bugfix) Use service-level known_hosts for generated SSH keys When a selected SSH credential has no pinned known_hosts entries, create and use a pobsync service-level known_hosts file under POBSYNC_HOME/state. Pass UserKnownHostsFile and StrictHostKeyChecking=accept-new to SSH so unattended backups no longer depend on root's known_hosts or an interactive shell session. Keep pinned credential known_hosts behavior unchanged when entries are configured explicitly. --- src/pobsync_backend/config_source.py | 11 +++++++++ src/pobsync_backend/host_ops.py | 4 ++-- .../tests/test_django_config_source.py | 23 +++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/pobsync_backend/config_source.py b/src/pobsync_backend/config_source.py index ff4003e..2b2880b 100644 --- a/src/pobsync_backend/config_source.py +++ b/src/pobsync_backend/config_source.py @@ -40,6 +40,11 @@ def _attach_credential_options(config: dict[str, Any], credential: SshCredential options.append(f"-oIdentityFile={paths['identity_file']}") if paths.get("known_hosts") and not _has_ssh_option(options, "UserKnownHostsFile"): options.append(f"-oUserKnownHostsFile={paths['known_hosts']}") + if paths.get("accept_new_known_hosts"): + if not _has_ssh_option(options, "UserKnownHostsFile"): + options.append(f"-oUserKnownHostsFile={paths['accept_new_known_hosts']}") + if not _has_ssh_option(options, "StrictHostKeyChecking"): + options.append("-oStrictHostKeyChecking=accept-new") ssh["options"] = options config["ssh_credential"] = { "id": credential.pk, @@ -69,6 +74,12 @@ def _materialize_credential(credential: SshCredential) -> dict[str, str]: known_hosts.write_text(_with_trailing_newline(credential.known_hosts), encoding="utf-8") os.chmod(known_hosts, 0o600) result["known_hosts"] = str(known_hosts) + else: + known_hosts = paths.state_dir / "known_hosts" + known_hosts.parent.mkdir(mode=0o700, parents=True, exist_ok=True) + known_hosts.touch(mode=0o600, exist_ok=True) + os.chmod(known_hosts, 0o600) + result["accept_new_known_hosts"] = str(known_hosts) return result diff --git a/src/pobsync_backend/host_ops.py b/src/pobsync_backend/host_ops.py index 32fe0fb..f65c2c8 100644 --- a/src/pobsync_backend/host_ops.py +++ b/src/pobsync_backend/host_ops.py @@ -69,8 +69,8 @@ def collect_host_checks(host: HostConfig, global_config: GlobalConfig | None = N SelfCheck( "Host known_hosts", "warning", - "Selected credential has no known_hosts entries.", - "Use Scan SSH host key before queueing unattended backups.", + "Selected credential has no pinned known_hosts entries.", + "pobsync will use service-level StrictHostKeyChecking=accept-new on first connect.", ) ) diff --git a/src/pobsync_backend/tests/test_django_config_source.py b/src/pobsync_backend/tests/test_django_config_source.py index a65a431..0d78740 100644 --- a/src/pobsync_backend/tests/test_django_config_source.py +++ b/src/pobsync_backend/tests/test_django_config_source.py @@ -90,6 +90,7 @@ class DjangoConfigSourceTests(TestCase): self.assertIn("-oBatchMode=yes", cfg["ssh"]["options"]) self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"]) self.assertIn(f"-oUserKnownHostsFile={known_hosts}", cfg["ssh"]["options"]) + self.assertNotIn("-oStrictHostKeyChecking=accept-new", cfg["ssh"]["options"]) self.assertEqual(cfg["ssh_credential"]["storage"], "database") def test_host_ssh_credential_overrides_global_credential(self) -> None: @@ -135,3 +136,25 @@ class DjangoConfigSourceTests(TestCase): self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"]) self.assertEqual(cfg["ssh_credential"]["storage"], "filesystem") + + def test_missing_known_hosts_uses_service_accept_new_file(self) -> None: + with TemporaryDirectory() as tmp: + identity_file = Path(tmp) / "identity" + identity_file.write_text("PRIVATE KEY\n", encoding="utf-8") + identity_file.chmod(0o600) + credential = SshCredential.objects.create(name="backup-key", key_path=str(identity_file), public_key="PUBLIC") + GlobalConfig.objects.create( + name="default", + backup_root="/backups", + pobsync_home="/opt/pobsync", + default_ssh_credential=credential, + ) + HostConfig.objects.create(host="web-01", address="web-01.example.test") + + with override_settings(POBSYNC_HOME=str(Path(tmp) / "home")): + cfg = DjangoConfigSource().effective_config_for_host("web-01") + service_known_hosts = Path(tmp) / "home" / "state" / "known_hosts" + + self.assertTrue(service_known_hosts.exists()) + self.assertIn(f"-oUserKnownHostsFile={service_known_hosts}", cfg["ssh"]["options"]) + self.assertIn("-oStrictHostKeyChecking=accept-new", cfg["ssh"]["options"])