diff --git a/src/cockpit/beiboot.py b/src/cockpit/beiboot.py index f3ec30dff4c2..6a0bd81cb602 100644 --- a/src/cockpit/beiboot.py +++ b/src/cockpit/beiboot.py @@ -36,7 +36,7 @@ from cockpit.jsonutil import JsonObject, get_str from cockpit.packages import Packages, PackagesLoader, patch_libexecdir from cockpit.peer import Peer -from cockpit.protocol import CockpitProblem +from cockpit.protocol import CockpitProblem, CockpitProtocolError from cockpit.router import Router, RoutingRule from cockpit.transports import StdioTransport @@ -130,10 +130,25 @@ class AuthorizeResponder(ferny.AskpassHandler): commands = ('ferny.askpass', 'cockpit.report-exists') router: Router - def __init__(self, router: Router): + def __init__(self, router: Router, basic_password: Optional[str]): self.router = router + self.basic_password = basic_password + self.have_basic_password = bool(basic_password) async def do_askpass(self, messages: str, prompt: str, hint: str) -> Optional[str]: + logger.debug("AuthorizeResponder: prompt %r, messages %r, hint %r", prompt, messages, hint) + + if self.have_basic_password and 'password:' in prompt.lower(): + # with our NumberOfPasswordPrompts=1 ssh should never actually ask us more than once; assert that + if self.basic_password is None: + raise CockpitProtocolError( + f"ssh asked for password a second time, but we already sent it; prompt: {messages}") + + logger.debug("AuthorizeResponder: sending Basic auth password for prompt %r", prompt) + reply = self.basic_password + self.basic_password = None + return reply + if hint == 'none': # We have three problems here: # @@ -272,7 +287,7 @@ async def connect_from_bastion_host(self) -> None: async def boot(self, cmd: Sequence[str], env: Sequence[str], basic_password: 'str | None' = None) -> None: beiboot_helper = BridgeBeibootHelper(self) - agent = ferny.InteractionAgent([AuthorizeResponder(self.router), beiboot_helper]) + agent = ferny.InteractionAgent([AuthorizeResponder(self.router, basic_password), beiboot_helper]) logger.debug("Launching command: cmd=%s env=%s", cmd, env) transport = await self.spawn(cmd, env, stderr=agent, start_new_session=True) diff --git a/test/browser/run-test.sh b/test/browser/run-test.sh index f862ca344cc1..b1f6f8eb963d 100644 --- a/test/browser/run-test.sh +++ b/test/browser/run-test.sh @@ -91,6 +91,7 @@ if [ "$PLAN" = "main" ]; then TestLogin.testFailingWebsocketSafari TestLogin.testFailingWebsocketSafariNoCA TestLogin.testLogging + TestLogin.testLoginSshBeiboot TestLogin.testRaw TestLogin.testServer TestLogin.testUnsupportedBrowser diff --git a/test/verify/check-static-login b/test/verify/check-static-login index a83552d6bc4f..f326d800f13a 100755 --- a/test/verify/check-static-login +++ b/test/verify/check-static-login @@ -954,6 +954,64 @@ matchrule = ^DC=LAN,DC=COCKPIT,CN=alice$ # by IPv6 address and port check_server("[::1]:22", expect_fp_ack=True) + # check cockpit-beiboot as replacement of cockpit-ssh on the login page + # once that becomes the default, TestMultiMachine* and the other TestLogin* cover this + @testlib.skipImage("needs pybridge", "rhel-8*", "centos-8*") + # enable this once our cockpit/ws container can beiboot + @testlib.skipOstree("client setup does not work with ws container") + def testLoginSshBeiboot(self): + m = self.machine + b = self.browser + + # this matches our bots test VMs + my_ip = "172.27.0.15" + m.write("/etc/cockpit/cockpit.conf", """ +[Ssh-Login] +Command = /usr/bin/env python3 -m cockpit.beiboot +""", append=True) + m.start_cockpit() + + def try_login(user, password, server=None): + b.open("/") + b.set_val('#login-user-input', user) + b.set_val('#login-password-input', password) + b.click("#show-other-login-options") + b.set_val("#server-field", server or my_ip) + b.click("#login-button") + # ack unknown host key; FIXME: this should be a proper authorize message, not a prompt + b.wait_in_text("#conversation-prompt", "authenticity of host") + b.set_val("#conversation-input", "yes") + b.click("#login-button") + + def check_no_processes(): + m.execute(f"while pgrep -af '[s]sh .* {my_ip}' >&2; do sleep 1; done") + m.execute("while pgrep -af '[c]ockpit.beiboot' >&2; do sleep 1; done") + + def check_session(): + b.wait_visible('#content') + b.enter_page('/system') + b.wait_visible('.system-information') + m.execute(f"pgrep -af '[s]sh .* -l admin .*{my_ip}'") + m.execute(f"pgrep -af '[c]ockpit.beiboot.*{my_ip}'") + b.logout() + check_no_processes() + + # successful login through SSH + try_login("admin", "foobar") + check_session() + + # wrong password + try_login("admin", "wrong") + b.wait_in_text("#login-error-message", "Authentication failed") + check_no_processes() + # goes back to normal login form + b.wait_visible('#login-user-input') + + # colliding usernames; user names in "Connect to:" are *not* supported, + # but pin down the behaviour + try_login("admin", "foobar", server=f"other@{my_ip}") + check_session() + if __name__ == '__main__': testlib.test_main()