Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervdw committed Feb 14, 2024
1 parent be4757f commit fdf93dc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
13 changes: 0 additions & 13 deletions nens_auth_client/tests/test_users.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from django.contrib.auth.models import User
from django.db import IntegrityError
from nens_auth_client.users import _extract_provider_name
from nens_auth_client.users import create_remote_user
from nens_auth_client.users import create_user
from nens_auth_client.users import update_remote_user
Expand Down Expand Up @@ -182,15 +181,3 @@ def test_create_user_username_exists(user_mgr, create_user_m, mocker):
first_call, second_call = create_user_m.call_args_list
assert first_call[0] == ("testuser", "abc")
assert second_call[0] == ("testuserx23f", "abc")


def test_extract_provider_name_present():
# Extract provider name when it is present.
claims = {"identities": [{"providerName": "Google"}]}
assert _extract_provider_name(claims) == "Google"


def test_extract_provider_name_absent():
# Return None when a provider name cannot be found.
claims = {"some": "claim"}
assert not _extract_provider_name(claims)
31 changes: 30 additions & 1 deletion nens_auth_client/wso2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from authlib.integrations.django_client import DjangoOAuth2App
from authlib.jose import JsonWebKey
from authlib.jose import JsonWebToken
from django.http.response import HttpResponseRedirect
from urllib.parse import urlencode
from urllib.parse import urlparse
Expand Down Expand Up @@ -43,7 +45,34 @@ def logout_redirect(self, request, redirect_uri=None, login_after=False):
return HttpResponseRedirect(logout_url)

def parse_access_token(self, token, claims_options=None, leeway=120):
raise NotImplementedError()
# this is a copy from the _parse_id_token equivalent function
def load_key(header, payload):
jwk_set = self.fetch_jwk_set()
kid = header.get("kid")
try:
return JsonWebKey.import_key_set(jwk_set).find_by_kid(kid)
except ValueError:
# re-try with new jwk set
jwk_set = self.fetch_jwk_set(force=True)
return JsonWebKey.import_key_set(jwk_set).find_by_kid(kid)

metadata = self.load_server_metadata()
claims_options = {
"iss": {"essential": True, "value": metadata["issuer"]},
"sub": {"essential": True},
**(claims_options or {}),
}

alg_values = metadata.get("id_token_signing_alg_values_supported")
if not alg_values:
alg_values = ["RS256"]

claims = JsonWebToken(alg_values).decode(
token, key=load_key, claims_options=claims_options
)

claims.validate(leeway=leeway)
return claims

def extract_provider_name(claims):
"""Return provider name from claim and `None` if not found"""
Expand Down

0 comments on commit fdf93dc

Please sign in to comment.