Skip to content

Commit

Permalink
reducing repeating of variables
Browse files Browse the repository at this point in the history
  • Loading branch information
ZohebShaikh committed Oct 17, 2024
1 parent 86c6e94 commit fba96e6
Showing 1 changed file with 65 additions and 107 deletions.
172 changes: 65 additions & 107 deletions tests/unit_tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from collections.abc import Mapping
from dataclasses import dataclass
from enum import Enum
from io import StringIO
from pathlib import Path
from textwrap import dedent
Expand Down Expand Up @@ -72,6 +73,11 @@ def test_connection_error_caught_by_wrapper_func(
assert result.stdout == "Failed to establish connection to FastAPI server.\n"


class OidcResponse(Enum):
VERIFICATION_URL = ""
DEVICE_AUTHORIZATION = ""


class MyModel(BaseModel):
id: str

Expand Down Expand Up @@ -637,11 +643,33 @@ def test_logout_missing_config(runner: CliRunner):
assert result.exit_code == 0


TOKEN_URL: str = "https://example.com/token"
DEVICE_AUTHORIZATION_URL: str = "https://example.com/device_authorization"
OIDC_URL: str = (
"https://auth.example.com/realms/sample/.well-known/openid-configuration"
)
OAUTH_CONFIGURATION: dict[str, str] = {
"device_authorization_endpoint": DEVICE_AUTHORIZATION_URL,
"authorization_endpoint": "https://example.com/authorization",
"token_endpoint": TOKEN_URL,
"issuer": "https://example.com",
"jwks_uri": "https://example.com/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "https://example.com/logout",
}
ERROR_RESPONSE: dict[str, str] = {
"details": "not found",
}
PAYLOAD: dict[str, str] = {
"name": "John Doe",
"fedid": "jd1",
}


@pytest.fixture
def valid_auth_config(tmp_path: Path) -> str:
config = f"""
config: str = f"""
oauth_server:
oidc_config_url: https://auth.example.com/realms/sample/.well-known/openid-configuration
oidc_config_url: {OIDC_URL}
oauth_client:
client_id: sample-cli
client_audience: sample-account
Expand All @@ -654,56 +682,39 @@ def valid_auth_config(tmp_path: Path) -> str:

@responses.activate
def test_login_success(runner: CliRunner, valid_auth_config: str):
payload: dict[str, str] = {
"name": "John Doe",
"fedid": "jd1",
}

mock_json_responses: list[dict[str, str]] = [
{
"device_authorization_endpoint": "https://example.com/device_authorization",
"authorization_endpoint": "https://example.com/authorization",
"token_endpoint": "https://example.com/token",
"issuer": "https://example.com",
"jwks_uri": "https://example.com/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "https://example.com/logout",
},
{
"device_code": "device_code",
"verification_uri_complete": "https://example.com/verify",
},
{
"access_token": "token",
},
]
with responses.RequestsMock(assert_all_requests_are_fired=True) as requests_mock:
requests_mock.add(
requests_mock.GET,
"https://auth.example.com/realms/sample/.well-known/openid-configuration",
json=mock_json_responses[0],
OIDC_URL,
json=OAUTH_CONFIGURATION,
status=200,
)
requests_mock.add(
requests_mock.POST,
"https://example.com/device_authorization",
json=mock_json_responses[1],
DEVICE_AUTHORIZATION_URL,
json={
"device_code": "device_code",
"verification_uri_complete": "https://example.com/verify",
},
status=200,
)
requests_mock.add(
requests_mock.POST,
"https://example.com/token",
json=mock_json_responses[2],
TOKEN_URL,
json={
"access_token": "token",
},
status=200,
)
with (
patch("blueapi.service.Authenticator.decode_jwt") as mock_decode,
):
mock_decode.return_value = payload
mock_decode.return_value = PAYLOAD
result = runner.invoke(main, ["-c", valid_auth_config, "login"])
assert (
"Logging in\n"
"Please login from this URL:- https://example.com/verify\n"
f"Logged in as {payload['name']} with fed-id {payload['fedid']}\n"
f"Logged in as {PAYLOAD['name']} with fed-id {PAYLOAD['fedid']}\n"
== result.output
)
assert result.exit_code == 0
Expand All @@ -720,39 +731,22 @@ def test_token_login_early_exit(
b'{"access_token":"token","refresh_token":"refresh_token"}'
).decode("utf-8")
)
payload: dict[str, Any] = {
"name": "John Doe",
"fedid": "jd1",
}

mock_json_responses: list[dict[str, str]] = [
{
"device_authorization_endpoint": "https://example.com/device_authorization",
"authorization_endpoint": "https://example.com/authorization",
"token_endpoint": "https://example.com/token",
"issuer": "https://example.com",
"jwks_uri": "https://example.com/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "https://example.com/logout",
},
{
"access_token": "token",
},
]

with responses.RequestsMock(assert_all_requests_are_fired=True) as requests_mock:
requests_mock.add(
requests_mock.GET,
"https://auth.example.com/realms/sample/.well-known/openid-configuration",
json=mock_json_responses[0],
OIDC_URL,
json=OAUTH_CONFIGURATION,
status=200,
)
with (
patch("blueapi.service.Authenticator.decode_jwt") as mock_decode,
):
mock_decode.side_effect = [payload, payload]
mock_decode.side_effect = [PAYLOAD, PAYLOAD]
result = runner.invoke(main, ["-c", valid_auth_config, "login"])
assert (
"Logging in\n"
f"Logged in as {payload['name']} with fed-id {payload['fedid']}\n"
f"Logged in as {PAYLOAD['name']} with fed-id {PAYLOAD['fedid']}\n"
== result.output
)
assert result.exit_code == 0
Expand All @@ -769,45 +763,30 @@ def test_login_with_refresh_token(
b'{"access_token":"token","refresh_token":"refresh_token"}'
).decode("utf-8")
)
payload: dict[str, Any] = {
"name": "John Doe",
"fedid": "jd1",
}

mock_json_responses: list[dict[str, str]] = [
{
"device_authorization_endpoint": "https://example.com/device_authorization",
"authorization_endpoint": "https://example.com/authorization",
"token_endpoint": "https://example.com/token",
"issuer": "https://example.com",
"jwks_uri": "https://example.com/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "https://example.com/logout",
},
{
"access_token": "token",
},
]

with responses.RequestsMock(assert_all_requests_are_fired=True) as requests_mock:
requests_mock.add(
requests_mock.GET,
"https://auth.example.com/realms/sample/.well-known/openid-configuration",
json=mock_json_responses[0],
OIDC_URL,
json=OAUTH_CONFIGURATION,
status=200,
)
requests_mock.add(
requests_mock.POST,
"https://example.com/token",
json=mock_json_responses[1],
TOKEN_URL,
json={
"access_token": "token",
},
status=200,
)
with (
patch("blueapi.service.Authenticator.decode_jwt") as mock_decode,
):
mock_decode.side_effect = [jwt.ExpiredSignatureError, payload]
mock_decode.side_effect = [jwt.ExpiredSignatureError, PAYLOAD]
result = runner.invoke(main, ["-c", valid_auth_config, "login"])
assert (
"Logging in\n"
f"Logged in as {payload['name']} with fed-id {payload['fedid']}\n"
f"Logged in as {PAYLOAD['name']} with fed-id {PAYLOAD['fedid']}\n"
== result.output
)
assert result.exit_code == 0
Expand All @@ -822,37 +801,23 @@ def test_login_edge_cases(runner: CliRunner, valid_auth_config: str, tmp_path: P
b'{"access_token":"token","refresh_token":"refresh_token"}'
).decode("utf-8")
)

mock_json_responses: list[dict[str, str]] = [
{
"device_authorization_endpoint": "https://example.com/device_authorization",
"authorization_endpoint": "https://example.com/authorization",
"token_endpoint": "https://example.com/token",
"issuer": "https://example.com",
"jwks_uri": "https://example.com/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "https://example.com/logout",
},
{
"details": "not found",
},
]
with responses.RequestsMock(assert_all_requests_are_fired=True) as requests_mock:
requests_mock.add(
requests_mock.GET,
"https://auth.example.com/realms/sample/.well-known/openid-configuration",
json=mock_json_responses[0],
OIDC_URL,
json=OAUTH_CONFIGURATION,
status=200,
)
requests_mock.add(
requests_mock.POST,
"https://example.com/token",
json=mock_json_responses[1],
TOKEN_URL,
json=ERROR_RESPONSE,
status=400,
)
requests_mock.add(
requests_mock.POST,
"https://example.com/device_authorization",
json=mock_json_responses[1],
DEVICE_AUTHORIZATION_URL,
json=ERROR_RESPONSE,
status=400,
)
with (
Expand All @@ -871,15 +836,8 @@ def test_logout_success(runner: CliRunner, valid_auth_config: str, tmp_path: Pat
token_file.write(base64.b64encode(b'{"access_token":"token"}').decode("utf-8"))
response = responses.add(
responses.GET,
"https://auth.example.com/realms/sample/.well-known/openid-configuration",
json={
"device_authorization_endpoint": "https://example.com/device_authorization",
"authorization_endpoint": "https://example.com/authorization",
"token_endpoint": "https://example.com/token",
"issuer": "https://example.com",
"jwks_uri": "https://example.com/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "https://example.com/logout",
},
OIDC_URL,
json=OAUTH_CONFIGURATION,
status=200,
)
assert tmp_path.joinpath("token").exists() is True
Expand Down

0 comments on commit fba96e6

Please sign in to comment.