(bugfix) Use service-level known_hosts for generated SSH keys
When a selected SSH credential has no pinned known_hosts entries, create and use a pobsync service-level known_hosts file under POBSYNC_HOME/state. Pass UserKnownHostsFile and StrictHostKeyChecking=accept-new to SSH so unattended backups no longer depend on root's known_hosts or an interactive shell session. Keep pinned credential known_hosts behavior unchanged when entries are configured explicitly.
This commit is contained in:
@@ -40,6 +40,11 @@ def _attach_credential_options(config: dict[str, Any], credential: SshCredential
|
|||||||
options.append(f"-oIdentityFile={paths['identity_file']}")
|
options.append(f"-oIdentityFile={paths['identity_file']}")
|
||||||
if paths.get("known_hosts") and not _has_ssh_option(options, "UserKnownHostsFile"):
|
if paths.get("known_hosts") and not _has_ssh_option(options, "UserKnownHostsFile"):
|
||||||
options.append(f"-oUserKnownHostsFile={paths['known_hosts']}")
|
options.append(f"-oUserKnownHostsFile={paths['known_hosts']}")
|
||||||
|
if paths.get("accept_new_known_hosts"):
|
||||||
|
if not _has_ssh_option(options, "UserKnownHostsFile"):
|
||||||
|
options.append(f"-oUserKnownHostsFile={paths['accept_new_known_hosts']}")
|
||||||
|
if not _has_ssh_option(options, "StrictHostKeyChecking"):
|
||||||
|
options.append("-oStrictHostKeyChecking=accept-new")
|
||||||
ssh["options"] = options
|
ssh["options"] = options
|
||||||
config["ssh_credential"] = {
|
config["ssh_credential"] = {
|
||||||
"id": credential.pk,
|
"id": credential.pk,
|
||||||
@@ -69,6 +74,12 @@ def _materialize_credential(credential: SshCredential) -> dict[str, str]:
|
|||||||
known_hosts.write_text(_with_trailing_newline(credential.known_hosts), encoding="utf-8")
|
known_hosts.write_text(_with_trailing_newline(credential.known_hosts), encoding="utf-8")
|
||||||
os.chmod(known_hosts, 0o600)
|
os.chmod(known_hosts, 0o600)
|
||||||
result["known_hosts"] = str(known_hosts)
|
result["known_hosts"] = str(known_hosts)
|
||||||
|
else:
|
||||||
|
known_hosts = paths.state_dir / "known_hosts"
|
||||||
|
known_hosts.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
|
||||||
|
known_hosts.touch(mode=0o600, exist_ok=True)
|
||||||
|
os.chmod(known_hosts, 0o600)
|
||||||
|
result["accept_new_known_hosts"] = str(known_hosts)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -69,8 +69,8 @@ def collect_host_checks(host: HostConfig, global_config: GlobalConfig | None = N
|
|||||||
SelfCheck(
|
SelfCheck(
|
||||||
"Host known_hosts",
|
"Host known_hosts",
|
||||||
"warning",
|
"warning",
|
||||||
"Selected credential has no known_hosts entries.",
|
"Selected credential has no pinned known_hosts entries.",
|
||||||
"Use Scan SSH host key before queueing unattended backups.",
|
"pobsync will use service-level StrictHostKeyChecking=accept-new on first connect.",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -90,6 +90,7 @@ class DjangoConfigSourceTests(TestCase):
|
|||||||
self.assertIn("-oBatchMode=yes", cfg["ssh"]["options"])
|
self.assertIn("-oBatchMode=yes", cfg["ssh"]["options"])
|
||||||
self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"])
|
self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"])
|
||||||
self.assertIn(f"-oUserKnownHostsFile={known_hosts}", cfg["ssh"]["options"])
|
self.assertIn(f"-oUserKnownHostsFile={known_hosts}", cfg["ssh"]["options"])
|
||||||
|
self.assertNotIn("-oStrictHostKeyChecking=accept-new", cfg["ssh"]["options"])
|
||||||
self.assertEqual(cfg["ssh_credential"]["storage"], "database")
|
self.assertEqual(cfg["ssh_credential"]["storage"], "database")
|
||||||
|
|
||||||
def test_host_ssh_credential_overrides_global_credential(self) -> None:
|
def test_host_ssh_credential_overrides_global_credential(self) -> None:
|
||||||
@@ -135,3 +136,25 @@ class DjangoConfigSourceTests(TestCase):
|
|||||||
|
|
||||||
self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"])
|
self.assertIn(f"-oIdentityFile={identity_file}", cfg["ssh"]["options"])
|
||||||
self.assertEqual(cfg["ssh_credential"]["storage"], "filesystem")
|
self.assertEqual(cfg["ssh_credential"]["storage"], "filesystem")
|
||||||
|
|
||||||
|
def test_missing_known_hosts_uses_service_accept_new_file(self) -> None:
|
||||||
|
with TemporaryDirectory() as tmp:
|
||||||
|
identity_file = Path(tmp) / "identity"
|
||||||
|
identity_file.write_text("PRIVATE KEY\n", encoding="utf-8")
|
||||||
|
identity_file.chmod(0o600)
|
||||||
|
credential = SshCredential.objects.create(name="backup-key", key_path=str(identity_file), public_key="PUBLIC")
|
||||||
|
GlobalConfig.objects.create(
|
||||||
|
name="default",
|
||||||
|
backup_root="/backups",
|
||||||
|
pobsync_home="/opt/pobsync",
|
||||||
|
default_ssh_credential=credential,
|
||||||
|
)
|
||||||
|
HostConfig.objects.create(host="web-01", address="web-01.example.test")
|
||||||
|
|
||||||
|
with override_settings(POBSYNC_HOME=str(Path(tmp) / "home")):
|
||||||
|
cfg = DjangoConfigSource().effective_config_for_host("web-01")
|
||||||
|
service_known_hosts = Path(tmp) / "home" / "state" / "known_hosts"
|
||||||
|
|
||||||
|
self.assertTrue(service_known_hosts.exists())
|
||||||
|
self.assertIn(f"-oUserKnownHostsFile={service_known_hosts}", cfg["ssh"]["options"])
|
||||||
|
self.assertIn("-oStrictHostKeyChecking=accept-new", cfg["ssh"]["options"])
|
||||||
|
|||||||
Reference in New Issue
Block a user