diff --git a/testsuite/objects/__init__.py b/testsuite/objects/__init__.py index a2338a22..33e83a97 100644 --- a/testsuite/objects/__init__.py +++ b/testsuite/objects/__init__.py @@ -1,11 +1,50 @@ """Module containing base classes for common objects""" import abc -from dataclasses import dataclass +from dataclasses import dataclass, is_dataclass, fields +from copy import deepcopy from functools import cached_property -from typing import Literal, List +from typing import Literal, List, Dict, Union from testsuite.objects.sections import Metadata, Identities, Authorizations, Responses +JSONValues = Union[None, str, int, bool, List["JSONValues"], Dict[str, "JSONValues"]] + + +def asdict(obj) -> Dict[str, JSONValues]: + """ + This function converts dataclass object to dictionary. + While it works similar to `dataclasses.asdict` a notable change is usage of + overriding `to_dict()` function if dataclass contains it. + This function works recursively in lists, tuples and dicts. All other values are passed to copy.deepcopy function. + """ + if not is_dataclass(obj): + raise TypeError("asdict() should be called on dataclass instances") + return _asdict_recurse(obj) + + +def _asdict_recurse(obj): + if hasattr(obj, "asdict"): + return obj.asdict() + + if not is_dataclass(obj): + return deepcopy(obj) + + result = {} + for field in fields(obj): + value = getattr(obj, field.name) + if value is None: + continue # do not include None values + + if is_dataclass(value): + result[field.name] = _asdict_recurse(value) + elif isinstance(value, (list, tuple)): + result[field.name] = type(value)(_asdict_recurse(i) for i in value) + elif isinstance(value, dict): + result[field.name] = type(value)((_asdict_recurse(k), _asdict_recurse(v)) for k, v in value.items()) + else: + result[field.name] = deepcopy(value) + return result + @dataclass class MatchExpression: @@ -35,20 +74,30 @@ class Rule: value: str -class Value: - """Dataclass for specifying a Value in Authorization, can be either constant or value from AuthJson (jsonPath)""" +@dataclass +class ABCValue(abc.ABC): + """ + Abstract Dataclass for specifying a Value in Authorization, + can be either static or reference to value in AuthJson. + """ + - # pylint: disable=invalid-name - def __init__(self, value=None, jsonPath=None) -> None: - super().__init__() - if not (value is None) ^ (jsonPath is None): - raise AttributeError("Exactly one of the `value` and `jsonPath` argument must be specified") - self.value = value - self.jsonPath = jsonPath +@dataclass +class Value(ABCValue): + """Dataclass for static Value. Can be any value allowed in JSON: None, string, integer, bool, list, dict""" - def to_dict(self): - """Returns dict representation of itself (shallow copy only)""" - return {"value": self.value} if self.value else {"valueFrom": {"authJson": self.jsonPath}} + value: JSONValues + + +@dataclass +class ValueFrom(ABCValue): + """Dataclass for dynamic Value. It contains reference path to existing value in AuthJson.""" + + authJSON: str # pylint: disable=invalid-name + + def asdict(self): + """Override `asdict` function""" + return {"valueFrom": {"authJSON": self.authJSON}} @dataclass @@ -56,19 +105,14 @@ class Cache: """Dataclass for specifying Cache in Authorization""" ttl: int - key: Value - - def to_dict(self): - """Returns dict representation of itself (shallow copy only)""" - return {"ttl": self.ttl, "key": self.key.to_dict()} + key: ABCValue @dataclass class PatternRef: """Dataclass for specifying Pattern reference in Authorization""" - # pylint: disable=invalid-name - patternRef: str + patternRef: str # pylint: disable=invalid-name class LifecycleObject(abc.ABC): diff --git a/testsuite/objects/sections.py b/testsuite/objects/sections.py index c6f21a10..21af511b 100644 --- a/testsuite/objects/sections.py +++ b/testsuite/objects/sections.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from testsuite.objects import Rule, Value + from testsuite.objects import Rule, ABCValue class Authorizations(abc.ABC): @@ -27,7 +27,7 @@ def auth_rule(self, name: str, rule: "Rule", **common_features): """Adds JSON pattern-matching authorization rule (authorization.json)""" @abc.abstractmethod - def kubernetes(self, name: str, user: "Value", kube_attrs: dict, **common_features): + def kubernetes(self, name: str, user: "ABCValue", kube_attrs: dict, **common_features): """Adds kubernetes authorization rule.""" diff --git a/testsuite/openshift/objects/auth_config/sections.py b/testsuite/openshift/objects/auth_config/sections.py index 9cb18602..22e6d4b0 100644 --- a/testsuite/openshift/objects/auth_config/sections.py +++ b/testsuite/openshift/objects/auth_config/sections.py @@ -1,8 +1,17 @@ """AuthConfig CR object""" -from dataclasses import asdict from typing import Dict, Literal, Iterable, TYPE_CHECKING -from testsuite.objects import Identities, Metadata, Responses, MatchExpression, Authorizations, Rule, Cache, Value +from testsuite.objects import ( + asdict, + Identities, + Metadata, + Responses, + MatchExpression, + Authorizations, + Rule, + Cache, + ABCValue, +) from testsuite.openshift.objects import modify if TYPE_CHECKING: @@ -45,7 +54,7 @@ def add_item( if metrics: item["metrics"] = metrics if cache: - item["cache"] = cache.to_dict() + item["cache"] = asdict(cache) if priority: item["priority"] = priority self.section.append(item) @@ -215,7 +224,7 @@ def external_opa_policy(self, name, endpoint, ttl=0, **common_features): self.add_item(name, {"opa": {"externalRegistry": {"endpoint": endpoint, "ttl": ttl}}}, **common_features) @modify - def kubernetes(self, name: str, user: Value, kube_attrs: dict, **common_features): + def kubernetes(self, name: str, user: ABCValue, kube_attrs: dict, **common_features): """Adds Kubernetes authorization :param name: name of kubernetes authorization @@ -226,7 +235,7 @@ def kubernetes(self, name: str, user: Value, kube_attrs: dict, **common_features self.add_item( name, { - "kubernetes": {"user": user.to_dict(), "resourceAttributes": kube_attrs}, + "kubernetes": {"user": asdict(user), "resourceAttributes": kube_attrs}, }, **common_features ) diff --git a/testsuite/tests/kuadrant/authorino/caching/metadata/test_caching.py b/testsuite/tests/kuadrant/authorino/caching/metadata/test_caching.py index d475d541..0910bd73 100644 --- a/testsuite/tests/kuadrant/authorino/caching/metadata/test_caching.py +++ b/testsuite/tests/kuadrant/authorino/caching/metadata/test_caching.py @@ -3,7 +3,7 @@ import pytest -from testsuite.objects import Cache, Value +from testsuite.objects import Cache, ValueFrom from testsuite.utils import extract_response @@ -16,7 +16,7 @@ def cache_ttl(): @pytest.fixture(scope="module") def authorization(authorization, module_label, expectation_path, cache_ttl): """Adds Cached Metadata to the AuthConfig""" - meta_cache = Cache(cache_ttl, Value(jsonPath="context.request.http.path")) + meta_cache = Cache(cache_ttl, ValueFrom("context.request.http.path")) authorization.metadata.http_metadata(module_label, expectation_path, "GET", cache=meta_cache) return authorization diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/test_webhook.py b/testsuite/tests/kuadrant/authorino/operator/tls/test_webhook.py index f903b371..fd217fee 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/test_webhook.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/test_webhook.py @@ -8,7 +8,7 @@ import openshift as oc from openshift import OpenShiftPythonException -from testsuite.objects import Authorization, Rule, Value +from testsuite.objects import Authorization, Rule, ValueFrom from testsuite.certificates import CertInfo from testsuite.utils import cert_builder from testsuite.openshift.objects.ingress import Ingress @@ -78,7 +78,7 @@ def authorization(authorization, openshift, module_label, authorino_domain) -> A # add OPA policy to process admission webhook request authorization.authorization.opa_policy("features", OPA_POLICY) - user_value = Value(jsonPath="auth.identity.username") + user_value = ValueFrom("auth.identity.username") when = [ Rule("auth.authorization.features.allow", "eq", "true"),