Skip to content

Commit

Permalink
Add support for pulling username from keyring subprocess provider
Browse files Browse the repository at this point in the history
This fixes pypa#12543 (and probably
pypa#12661).

Somewhat confusingly, when using using `PIP_KEYRING_PROVIDER=import`,
pip was able to fetch both a username and a password from keyring, but
when using `PIP_KEYRING_PROVIDER=subprocess`, it needed the username.

I had to rework the existing tests quite a bit to fit with this new
behavior, as it's now OK to ask for a username/password for an index
even if you don't have a username. This is why `KeyringSubprocessResult`
now subclasses `KeyringModuleV2`. While I was in here, I opted to remove
the "fixtures" values from `KeyringModuleV2` by introducing this new
`add_credential` contextmanager. IMO, they were hurting the readability
of our tests: you had to jump around quite a bit to see what the
contents of the keyring would be during our tests. It also forced all
our tests to play nicely with the same fixtures values, which IMO was an
unnecessary constraint.

Note: per the discussion
[here](pypa#12748 (comment)),
I've opted not to implement any "feature detection" to see if the
`keyring` subprocess supports `--mode=creds`. For the record, this
feature was added in
jaraco/keyring@7830a64,
which landed in keyring v25.2.0.
  • Loading branch information
jfly committed Aug 21, 2024
1 parent 858a515 commit 59ad998
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 73 deletions.
1 change: 1 addition & 0 deletions news/12543.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for pulling username from keyring subprocess provider
24 changes: 13 additions & 11 deletions src/pip/_internal/network/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
providing credentials in the context of network requests.
"""

import json
import logging
import os
import shutil
Expand Down Expand Up @@ -115,23 +116,22 @@ def __init__(self, cmd: str) -> None:
self.keyring = cmd

def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
# This is the default implementation of keyring.get_credential
# https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
if username is not None:
password = self._get_password(url, username)
if password is not None:
return username, password
return None
return self._get_creds(url, username)

def save_auth_info(self, url: str, username: str, password: str) -> None:
return self._set_password(url, username, password)

def _get_password(self, service_name: str, username: str) -> Optional[str]:
"""Mirror the implementation of keyring.get_password using cli"""
def _get_creds(
self, service_name: str, username: Optional[str]
) -> Optional[AuthInfo]:
"""Mirror the implementation of keyring.get_credential using cli"""
if self.keyring is None:
return None

cmd = [self.keyring, "get", service_name, username]
cmd = [self.keyring, "--mode=creds", "--output=json", "get", service_name]
if username is not None:
cmd.append(username)

env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
res = subprocess.run(
Expand All @@ -142,7 +142,9 @@ def _get_password(self, service_name: str, username: str) -> Optional[str]:
)
if res.returncode:
return None
return res.stdout.decode("utf-8").strip(os.linesep)

data = json.loads(res.stdout.decode("utf-8"))
return (data["username"], data["password"])

def _set_password(self, service_name: str, username: str, password: str) -> None:
"""Mirror the implementation of keyring.set_password using cli"""
Expand Down
9 changes: 2 additions & 7 deletions tests/functional/test_install_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,7 @@ def set_password(self, url, username):
"simple",
)

function_name = (
"get_credential"
if keyring_provider_implementation == "import"
else "get_password"
)
if auth_needed:
assert function_name + " was called" in result.stderr
assert "get_credential was called" in result.stderr
else:
assert function_name + " was called" not in result.stderr
assert "get_credential was called" not in result.stderr
239 changes: 184 additions & 55 deletions tests/unit/test_network_auth.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import contextlib
import functools
import json
import os
import subprocess
import sys
from typing import Any, Dict, Iterable, List, Optional, Tuple
from dataclasses import dataclass
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple

import pytest

Expand Down Expand Up @@ -327,42 +330,98 @@ def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
class KeyringModuleV2:
"""Represents the current supported API of keyring"""

def __init__(self) -> None:
self.saved_credential_by_username_by_system: dict[
str, dict[str, KeyringModuleV2.Credential]
] = {}

@dataclass
class Credential:
def __init__(self, username: str, password: str) -> None:
self.username = username
self.password = password
username: str
password: str

def get_password(self, system: str, username: str) -> None:
pytest.fail("get_password should not ever be called")

def get_credential(self, system: str, username: str) -> Optional[Credential]:
if system == "http://example.com/path2/":
return self.Credential("username", "url")
if system == "example.com":
return self.Credential("username", "netloc")
return None
def get_credential(
self, system: str, username: Optional[str]
) -> Optional[Credential]:
credential_by_username = self.saved_credential_by_username_by_system.get(
system, {}
)
if username is None:
# Just return the first cred we can find (if
# there even are any for this service).
credentials = list(credential_by_username.values())
if len(credentials) == 0:
return None

# Just pick the first one we can find.
credential = credentials[0]
return credential

return credential_by_username.get(username)

def set_password(self, system: str, username: str, password: str) -> None:
if system not in self.saved_credential_by_username_by_system:
self.saved_credential_by_username_by_system[system] = {}

credential_by_username = self.saved_credential_by_username_by_system[system]
assert username not in credential_by_username
credential_by_username[username] = self.Credential(username, password)

def delete_password(self, system: str, username: str) -> None:
del self.saved_credential_by_username_by_system[system][username]

@contextlib.contextmanager
def add_credential(
self, system: str, username: str, password: str
) -> Generator[None, None, None]:
"""
Context manager that adds the given credential to the keyring
and yields. Once the yield is done, the credential is removed
from the keyring.
This is re-entrant safe: it's ok for one thread to call this while in
the middle of an existing invocation
This is probably not thread safe: it's not ok for multiple threads to
simultaneously call this method on the exact same instance of KeyringModuleV2.
"""
self.set_password(system, username, password)
try:
yield
finally:
# No matter what happened, make sure we clean up after ourselves.
self.delete_password(system, username)


@pytest.mark.parametrize(
"url, expect",
[
("http://example.com/path1", ("username", "netloc")),
("http://example.com/path2/path3", ("username", "url")),
("http://[email protected]/path2/path3", ("username", "url")),
("http://example.com/path1", ("username", "hunter2")),
("http://example.com/path2/path3", ("username", "hunter3")),
("http://[email protected]/path2/path3", ("user2", None)),
],
)
def test_keyring_get_credential(
monkeypatch: pytest.MonkeyPatch, url: str, expect: Tuple[str, str]
) -> None:
monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2())
keyring = KeyringModuleV2()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path1", "http://example.com/path2"],
keyring_provider="import",
)

assert (
auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) == expect
)
with (
keyring.add_credential("example.com", "username", "hunter2"),
keyring.add_credential("http://example.com/path2/", "username", "hunter3"),
):
assert (
auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
== expect
)


class KeyringModuleBroken:
Expand Down Expand Up @@ -393,7 +452,7 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non
assert keyring_broken._call_count == 1


class KeyringSubprocessResult(KeyringModuleV1):
class KeyringSubprocessResult(KeyringModuleV2):
"""Represents the subprocess call to keyring"""

returncode = 0 # Default to zero retcode
Expand All @@ -408,33 +467,85 @@ def __call__(
input: Optional[bytes] = None,
check: Optional[bool] = None,
) -> Any:
if cmd[1] == "get":
assert stdin == -3 # subprocess.DEVNULL
assert stdout == subprocess.PIPE
assert env["PYTHONIOENCODING"] == "utf-8"
assert check is None

password = self.get_password(*cmd[2:])
if password is None:
# Expect non-zero returncode if no password present
self.returncode = 1
else:
# Passwords are returned encoded with a newline appended
self.returncode = 0
self.stdout = (password + os.linesep).encode("utf-8")

if cmd[1] == "set":
assert stdin is None
assert stdout is None
assert env["PYTHONIOENCODING"] == "utf-8"
assert input is not None
assert check

# Input from stdin is encoded
self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip(os.linesep))
parsed_cmd = list(cmd)
assert parsed_cmd.pop(0) == "keyring"
subcommand = [
arg
for arg in parsed_cmd
# Skip past all the --whatever options until we get to the subcommand.
if not arg.startswith("--")
][0]
subcommand_func = {
"get": self._get_subcommand,
"set": self._set_subcommand,
}[subcommand]

subcommand_func(
parsed_cmd,
env=env,
stdin=stdin,
stdout=stdout,
input=input,
check=check,
)

return self

def _get_subcommand(
self,
cmd: List[str],
*,
env: Dict[str, str],
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
input: Optional[bytes] = None,
check: Optional[bool] = None,
) -> None:
assert cmd.pop(0) == "--mode=creds"
assert cmd.pop(0) == "--output=json"
assert stdin == -3 # subprocess.DEVNULL
assert stdout == subprocess.PIPE
assert env["PYTHONIOENCODING"] == "utf-8"
assert check is None
assert cmd.pop(0) == "get"

service = cmd.pop(0)
username = cmd.pop(0) if len(cmd) > 0 else None
creds = self.get_credential(service, username)
if creds is None:
# Expect non-zero returncode if no creds present
self.returncode = 1
else:
# Passwords are returned encoded with a newline appended
self.returncode = 0
self.stdout = json.dumps(
{
"username": creds.username,
"password": creds.password,
}
).encode("utf-8")

def _set_subcommand(
self,
cmd: List[str],
*,
env: Dict[str, str],
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
input: Optional[bytes] = None,
check: Optional[bool] = None,
) -> None:
assert cmd.pop(0) == "set"
assert stdin is None
assert stdout is None
assert env["PYTHONIOENCODING"] == "utf-8"
assert input is not None
assert check

# Input from stdin is encoded
system, username = cmd
self.set_password(system, username, input.decode("utf-8").strip(os.linesep))

def check_returncode(self) -> None:
if self.returncode:
raise Exception()
Expand All @@ -443,31 +554,42 @@ def check_returncode(self) -> None:
@pytest.mark.parametrize(
"url, expect",
[
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://[email protected]/path3", ("user", "user!netloc")),
("http://[email protected]/path3", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://[email protected]/path2/path3", ("foo", "foo!url")),
# It's not obvious, but this url ultimately resolves to index url
# http://example.com/path2, so we get the creds for that index.
("http://example.com/path1", ("saved-user1", "pw1")),
("http://[email protected]/path2", ("saved-user1", "pw1")),
("http://[email protected]/path2", ("saved-user2", "pw2")),
("http://[email protected]/path2", ("new-user", None)),
("http://example.com/path2/path3", ("saved-user1", "pw1")),
("http://[email protected]/path2/path3", ("foo", None)),
],
)
def test_keyring_cli_get_password(
monkeypatch: pytest.MonkeyPatch,
url: str,
expect: Tuple[Optional[str], Optional[str]],
) -> None:
keyring_subprocess = KeyringSubprocessResult()
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
monkeypatch.setattr(
pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult()
pip._internal.network.auth.subprocess, "run", keyring_subprocess
)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path2", "http://example.com/path3"],
keyring_provider="subprocess",
)

actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
assert actual == expect
with (
keyring_subprocess.add_credential("example.com", "example", "!netloc"),
keyring_subprocess.add_credential(
"http://example.com/path2/", "saved-user1", "pw1"
),
keyring_subprocess.add_credential(
"http://example.com/path2/", "saved-user2", "pw2"
),
):
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
assert actual == expect


@pytest.mark.parametrize(
Expand All @@ -492,13 +614,14 @@ def test_keyring_cli_set_password(
creds: Tuple[str, str, bool],
expect_save: bool,
) -> None:
expected_username, expected_password, save = creds
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
keyring = KeyringSubprocessResult()
monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring)
auth = MultiDomainBasicAuth(prompting=True, keyring_provider="subprocess")
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
if creds[2]:
if save:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a: Any) -> bool:
return True
Expand Down Expand Up @@ -535,6 +658,12 @@ def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
auth.handle_401(resp)

if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
assert keyring.saved_credential_by_username_by_system == {
"example.com": {
expected_username: KeyringModuleV2.Credential(
expected_username, expected_password
),
},
}
else:
assert keyring.saved_passwords == []
assert keyring.saved_credential_by_username_by_system == {}

0 comments on commit 59ad998

Please sign in to comment.