Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pulling username from keyring subprocess provider #12748

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/12543.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for pulling username from keyring subprocess provider
24 changes: 13 additions & 11 deletions src/pip/_internal/network/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
providing credentials in the context of network requests.
"""

import json
import logging
import os
import shutil
Expand Down Expand Up @@ -115,23 +116,22 @@ def __init__(self, cmd: str) -> None:
self.keyring = cmd

def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
# This is the default implementation of keyring.get_credential
# https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
if username is not None:
password = self._get_password(url, username)
if password is not None:
return username, password
return None
return self._get_creds(url, username)

def save_auth_info(self, url: str, username: str, password: str) -> None:
return self._set_password(url, username, password)

def _get_password(self, service_name: str, username: str) -> Optional[str]:
"""Mirror the implementation of keyring.get_password using cli"""
def _get_creds(
self, service_name: str, username: Optional[str]
) -> Optional[AuthInfo]:
"""Mirror the implementation of keyring.get_credential using cli"""
if self.keyring is None:
return None

cmd = [self.keyring, "get", service_name, username]
cmd = [self.keyring, "--mode=creds", "--output=json", "get", service_name]
if username is not None:
cmd.append(username)

env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
res = subprocess.run(
Expand All @@ -142,7 +142,9 @@ def _get_password(self, service_name: str, username: str) -> Optional[str]:
)
if res.returncode:
return None
return res.stdout.decode("utf-8").strip(os.linesep)

data = json.loads(res.stdout.decode("utf-8"))
return (data["username"], data["password"])

def _set_password(self, service_name: str, username: str, password: str) -> None:
"""Mirror the implementation of keyring.set_password using cli"""
Expand Down
108 changes: 57 additions & 51 deletions tests/functional/test_install_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ def test_prompt_for_keyring_if_needed(
cert_factory: CertFactory,
auth_needed: bool,
flags: List[str],
keyring_provider: str,
keyring_provider_implementation: str,
tmpdir: Path,
script_factory: ScriptFactory,
Expand All @@ -403,10 +402,12 @@ def test_prompt_for_keyring_if_needed(
"""Test behaviour while installing from an index url
requiring authentication and keyring is possible.
"""
environ = os.environ.copy()
workspace = tmpdir.joinpath("workspace")

virtualenv = virtualenv_factory(workspace.joinpath("venv"))

if keyring_provider_implementation == "subprocess":
# Install keyring into its own venv.
keyring_virtualenv = virtualenv_factory(workspace.joinpath("keyring"))
keyring_script = script_factory(
workspace.joinpath("keyring"), keyring_virtualenv
Expand All @@ -416,22 +417,32 @@ def test_prompt_for_keyring_if_needed(
"keyring",
)

environ["PATH"] = str(keyring_script.bin_path) + os.pathsep + environ["PATH"]

virtualenv = virtualenv_factory(workspace.joinpath("venv"))
script = script_factory(workspace.joinpath("venv"), virtualenv, environ=environ)

if (
keyring_provider not in [None, "auto"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the pre-existing code hard to grok. This line in particular really confounds me: why does keyring_provider influence how we set up the test environment? I'd expect that to be only dependent upon keyring_provider_implementation (which is the change I've made in this PR).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After your refactoring keyring won't be importable when keyring_provider in [None, "auto"] and keyring_provider_implementation == "disabled". (first subcondition is false, second is true)
Which might be relevant because of:

        if keyring_provider_implementation == "disabled" or (
            not interactive and keyring_provider in [None, "auto"]
        ):
            request.applymarker(pytest.mark.xfail())

So the test should fail, so far so good.
I'm starting to suspect I did this so keyring would be installed when keyring_provider == "auto" and keyring_provider_implementation == "disabled" when what I actually was thinking about is the keyring_provider == "disabled" and keyring_provider_implementation == "import" case.

Maybe it is an idea to ask for verbose output and check for the Keyring provider set: lines to make that more explicit? Although, that maght be better as a seperate test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After your refactoring keyring won't be importable when keyring_provider in [None, "auto"] and keyring_provider_implementation == "disabled".

Yes, this was intentional. Perhaps I'm misunderstanding these variable names? I read them as:

  • keyring_provider_implementation: this is the way i'm going to set up my environment for this test (is keyring installed in this venv, is is discoverable via PATH in a different venv, or is it not installed at all)
  • keyring_provider: this is the keyring provider i'm going to request when i invoke pip

So, it's intentional that if keyring_provider_implementation == "disabled", then yeah, keyring won't be imprtable.

But again, it's very possible I'm misunderstanding the intent of these tests. Guidance appreciated.

Maybe it is an idea to ask for verbose output and check for the Keyring provider set: lines to make that more explicit?

Yeah, I personally not in love with this elaborate usage of parameterize + xfail. Tests can fail for all sorts of reasons other than the expected reason! I think it would be better to assert on the expected failures. (IMO, that would be better to do in a separate PR)

or keyring_provider_implementation != "subprocess"
):
script.pip(
# Set up this venv with a PATH that can see the keyring installed in a
# separate venv.
virtualenv_script = script_factory(
workspace.joinpath("venv"),
virtualenv,
environ={
**os.environ,
"PATH": str(keyring_script.bin_path) + os.pathsep + os.environ["PATH"],
},
)
elif keyring_provider_implementation == "import":
# Set up a venv with keyring installed.
virtualenv_script = script_factory(workspace.joinpath("venv"), virtualenv)
virtualenv_script.pip(
"install",
"keyring",
)

if keyring_provider_implementation != "subprocess":
keyring_script = script
keyring_script = virtualenv_script
elif keyring_provider_implementation == "disabled":
# Set up an venv that does not have keyring installed, nor is able to
# find keyring anywhere on the PATH.
virtualenv_script = script_factory(workspace.joinpath("venv"), virtualenv)
keyring_script = None
pass
else:
pytest.fail(f"Unrecognized {keyring_provider_implementation=}")

cert_path = cert_factory()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Expand All @@ -454,42 +465,42 @@ def test_prompt_for_keyring_if_needed(

url = f"https://USERNAME@{server.host}:{server.port}/simple"

keyring_content = textwrap.dedent(
"""\
import os
import sys
import keyring
from keyring.backend import KeyringBackend
from keyring.credentials import SimpleCredential
if keyring_script is not None:
keyring_content = textwrap.dedent(
"""\
import os
import sys
import keyring
from keyring.backend import KeyringBackend
from keyring.credentials import SimpleCredential

class TestBackend(KeyringBackend):
priority = 1
class TestBackend(KeyringBackend):
priority = 1

def get_credential(self, url, username):
sys.stderr.write("get_credential was called" + os.linesep)
return SimpleCredential(username="USERNAME", password="PASSWORD")
def get_credential(self, url, username):
sys.stderr.write("get_credential was called" + os.linesep)
return SimpleCredential(username="USERNAME", password="PASSWORD")

def get_password(self, url, username):
sys.stderr.write("get_password was called" + os.linesep)
return "PASSWORD"
def get_password(self, url, username):
assert False, "get_password should not ever be called"

def set_password(self, url, username):
pass
"""
)
keyring_path = keyring_script.site_packages_path / "keyring_test.py"
keyring_path.write_text(keyring_content)
def set_password(self, url, username):
pass
"""
)
keyring_path = keyring_script.site_packages_path / "keyring_test.py"
keyring_path.write_text(keyring_content)

keyring_content = (
"import keyring_test;"
" import keyring;"
" keyring.set_keyring(keyring_test.TestBackend())" + os.linesep
)
keyring_path = keyring_path.with_suffix(".pth")
keyring_path.write_text(keyring_content)
keyring_content = (
"import keyring_test;"
" import keyring;"
" keyring.set_keyring(keyring_test.TestBackend())" + os.linesep
)
keyring_path = keyring_path.with_suffix(".pth")
keyring_path.write_text(keyring_content)

with server_running(server):
result = script.pip(
result = virtualenv_script.pip(
"install",
"--index-url",
url,
Expand All @@ -501,12 +512,7 @@ def set_password(self, url, username):
"simple",
)

function_name = (
"get_credential"
if keyring_provider_implementation == "import"
else "get_password"
)
if auth_needed:
assert function_name + " was called" in result.stderr
assert "get_credential was called" in result.stderr
else:
assert function_name + " was called" not in result.stderr
assert "get_credential was called" not in result.stderr
Loading