-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat: helper script to read KeyVault secrets (#1859)
- Loading branch information
Showing
11 changed files
with
272 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
testsecret=Hello from the local environment! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import logging | ||
import os | ||
import sys | ||
|
||
from azure.core.exceptions import ClientAuthenticationError | ||
from azure.identity import DefaultAzureCredential | ||
from azure.keyvault.secrets import SecretClient | ||
from django.conf import settings | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
KEY_VAULT_URL = "https://kv-cdt-pub-calitp-{env}-001.vault.azure.net/" | ||
|
||
|
||
def get_secret_by_name(secret_name, client=None): | ||
"""Read a value from the secret store, currently Azure KeyVault. | ||
When `settings.RUNTIME_ENVIRONMENT() == "local"`, reads from the environment instead. | ||
""" | ||
|
||
runtime_env = settings.RUNTIME_ENVIRONMENT() | ||
|
||
if runtime_env == "local": | ||
logger.debug("Runtime environment is local, reading from environment instead of Azure KeyVault.") | ||
return os.environ.get(secret_name) | ||
|
||
elif client is None: | ||
# construct the KeyVault URL from the runtime environment | ||
# see https://docs.calitp.org/benefits/deployment/infrastructure/#environments | ||
# and https://github.com/cal-itp/benefits/blob/dev/terraform/key_vault.tf | ||
vault_url = KEY_VAULT_URL.format(env=runtime_env[0]) | ||
logger.debug(f"Configuring Azure KeyVault secrets client for: {vault_url}") | ||
|
||
credential = DefaultAzureCredential() | ||
client = SecretClient(vault_url=vault_url, credential=credential) | ||
|
||
secret_value = None | ||
|
||
if client is not None: | ||
try: | ||
secret = client.get_secret(secret_name) | ||
secret_value = secret.value | ||
except ClientAuthenticationError: | ||
logger.error("Could not authenticate to Azure KeyVault") | ||
else: | ||
logger.error("Azure KeyVault SecretClient was not configured") | ||
|
||
return secret_value | ||
|
||
|
||
if __name__ == "__main__": | ||
args = sys.argv[1:] | ||
if len(args) < 1: | ||
print("Provide the name of the secret to read") | ||
exit(1) | ||
|
||
secret_name = args[0] | ||
secret_value = get_secret_by_name(secret_name) | ||
|
||
print(f"[{settings.RUNTIME_ENVIRONMENT()}] {secret_name}: {secret_value}") | ||
exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import pytest | ||
from azure.core.exceptions import ClientAuthenticationError | ||
|
||
from benefits.secrets import KEY_VAULT_URL, get_secret_by_name | ||
|
||
|
||
@pytest.fixture(autouse=True) | ||
def mock_DefaultAzureCredential(mocker): | ||
# patching the class to ensure new instances always return the same mock | ||
credential_cls = mocker.patch("benefits.secrets.DefaultAzureCredential") | ||
credential_cls.return_value = mocker.Mock() | ||
return credential_cls | ||
|
||
|
||
@pytest.fixture | ||
def secret_name(): | ||
return "the secret name" | ||
|
||
|
||
@pytest.fixture | ||
def secret_value(): | ||
return "the secret value" | ||
|
||
|
||
@pytest.mark.parametrize("runtime_env", ["dev", "test", "prod"]) | ||
def test_get_secret_by_name__with_client__returns_secret_value(mocker, runtime_env, settings, secret_name, secret_value): | ||
settings.RUNTIME_ENVIRONMENT = lambda: runtime_env | ||
|
||
client = mocker.patch("benefits.secrets.SecretClient") | ||
client.get_secret.return_value = mocker.Mock(value=secret_value) | ||
|
||
actual_value = get_secret_by_name(secret_name, client) | ||
|
||
client.get_secret.assert_called_once_with(secret_name) | ||
assert actual_value == secret_value | ||
|
||
|
||
@pytest.mark.parametrize("runtime_env", ["dev", "test", "prod"]) | ||
def test_get_secret_by_name__None_client__returns_secret_value( | ||
mocker, runtime_env, settings, mock_DefaultAzureCredential, secret_name, secret_value | ||
): | ||
settings.RUNTIME_ENVIRONMENT = lambda: runtime_env | ||
expected_keyvault_url = KEY_VAULT_URL.format(env=runtime_env[0]) | ||
|
||
# this test does not pass in a known client, instead checking that a client is constructed as expected | ||
mock_credential = mock_DefaultAzureCredential.return_value | ||
client_cls = mocker.patch("benefits.secrets.SecretClient") | ||
client = client_cls.return_value | ||
client.get_secret.return_value = mocker.Mock(value=secret_value) | ||
|
||
actual_value = get_secret_by_name(secret_name) | ||
|
||
client_cls.assert_called_once_with(vault_url=expected_keyvault_url, credential=mock_credential) | ||
client.get_secret.assert_called_once_with(secret_name) | ||
assert actual_value == secret_value | ||
|
||
|
||
@pytest.mark.parametrize("runtime_env", ["dev", "test", "prod"]) | ||
def test_get_secret_by_name__None_client__returns_None(mocker, runtime_env, settings, secret_name): | ||
settings.RUNTIME_ENVIRONMENT = lambda: runtime_env | ||
|
||
# this test forces construction of a new client to None | ||
client_cls = mocker.patch("benefits.secrets.SecretClient", return_value=None) | ||
|
||
actual_value = get_secret_by_name(secret_name) | ||
|
||
client_cls.assert_called_once() | ||
assert actual_value is None | ||
|
||
|
||
@pytest.mark.parametrize("runtime_env", ["dev", "test", "prod"]) | ||
def test_get_secret_by_name__unauthenticated_client__returns_None(mocker, runtime_env, settings, secret_name): | ||
settings.RUNTIME_ENVIRONMENT = lambda: runtime_env | ||
|
||
# this test forces client.get_secret to throw an exception | ||
client_cls = mocker.patch("benefits.secrets.SecretClient") | ||
client = client_cls.return_value | ||
client.get_secret.side_effect = ClientAuthenticationError | ||
|
||
actual_value = get_secret_by_name(secret_name) | ||
|
||
client_cls.assert_called_once() | ||
client.get_secret.assert_called_once_with(secret_name) | ||
assert actual_value is None | ||
|
||
|
||
def test_get_secret_by_name__local__returns_environment_variable(mocker, settings, secret_name, secret_value): | ||
settings.RUNTIME_ENVIRONMENT = lambda: "local" | ||
|
||
env_spy = mocker.patch("benefits.secrets.os.environ.get", return_value=secret_value) | ||
client_cls = mocker.patch("benefits.secrets.SecretClient") | ||
client = client_cls.return_value | ||
|
||
actual_value = get_secret_by_name(secret_name) | ||
|
||
client_cls.assert_not_called() | ||
client.get_secret.assert_not_called() | ||
env_spy.assert_called_once_with(secret_name) | ||
assert actual_value == secret_value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
def test_runtime_environment__default(settings): | ||
assert settings.RUNTIME_ENVIRONMENT() == "local" | ||
|
||
|
||
def test_runtime_environment__dev(settings): | ||
settings.ALLOWED_HOSTS = ["dev-benefits.calitp.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "dev" | ||
|
||
|
||
def test_runtime_environment__dev_and_test(settings): | ||
# if both dev and test are specified (edge case/error in configuration), assume dev | ||
settings.ALLOWED_HOSTS = ["test-benefits.calitp.org", "dev-benefits.calitp.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "dev" | ||
|
||
|
||
def test_runtime_environment__dev_and_test_and_prod(settings): | ||
# if all 3 of dev and test and prod are specified (edge case/error in configuration), assume dev | ||
settings.ALLOWED_HOSTS = ["benefits.calitp.org", "test-benefits.calitp.org", "dev-benefits.calitp.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "dev" | ||
|
||
|
||
def test_runtime_environment__local(settings): | ||
settings.ALLOWED_HOSTS = ["localhost", "127.0.0.1"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "local" | ||
|
||
|
||
def test_runtime_environment__nonmatching(settings): | ||
# with only nonmatching hosts, return local | ||
settings.ALLOWED_HOSTS = ["example.com", "example2.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "local" | ||
|
||
|
||
def test_runtime_environment__test(settings): | ||
settings.ALLOWED_HOSTS = ["test-benefits.calitp.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "test" | ||
|
||
|
||
def test_runtime_environment__test_and_nonmatching(settings): | ||
# when test is specified with other nonmatching hosts, assume test | ||
settings.ALLOWED_HOSTS = ["test-benefits.calitp.org", "example.com"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "test" | ||
|
||
|
||
def test_runtime_environment__test_and_prod(settings): | ||
# if both test and prod are specified (edge case/error in configuration), assume test | ||
settings.ALLOWED_HOSTS = ["benefits.calitp.org", "test-benefits.calitp.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "test" | ||
|
||
|
||
def test_runtime_environment__prod(settings): | ||
settings.ALLOWED_HOSTS = ["benefits.calitp.org"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "prod" | ||
|
||
|
||
def test_runtime_environment__prod_and_nonmatching(settings): | ||
# when prod is specified with other nonmatching hosts, assume prod | ||
settings.ALLOWED_HOSTS = ["benefits.calitp.org", "https://example.com"] | ||
assert settings.RUNTIME_ENVIRONMENT() == "prod" |