-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add expanded actions to TF AWS IAM resources
- Loading branch information
Showing
16 changed files
with
231 additions
and
57 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
85 changes: 85 additions & 0 deletions
85
checkov/common/checks_infra/extensions/iam_action_expansion.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
|
||
from policy_sentry.analysis.expand import expand # type:ignore[import] # will be fixed with the next version | ||
from typing_extensions import Self | ||
|
||
from checkov.common.graph.checks_infra.extensions.base_extension import BaseGraphCheckExtension | ||
from checkov.common.graph.graph_builder.graph_components.attribute_names import CustomAttributes | ||
from checkov.common.models.enums import GraphCheckExtension | ||
from checkov.common.util.data_structures_utils import pickle_deepcopy | ||
from checkov.common.util.type_forcers import force_list | ||
|
||
SUPPORTED_IAM_BLOCKS = { | ||
"aws_iam_group_policy", | ||
"aws_iam_policy", | ||
"aws_iam_role_policy", | ||
"aws_iam_user_policy", | ||
"aws_ssoadmin_permission_set_inline_policy", | ||
"data.aws_iam_policy_document", | ||
} | ||
IAM_POLICY_BLOCKS = { | ||
"aws_iam_group_policy", | ||
"aws_iam_policy", | ||
"aws_iam_role_policy", | ||
"aws_iam_user_policy", | ||
} | ||
|
||
|
||
class IamActionExpansion(BaseGraphCheckExtension): | ||
_instance = None # noqa: CCE003 # singleton | ||
|
||
name = GraphCheckExtension.IAM_ACTION_EXPANSION # noqa: CCE003 # a static attribute | ||
iam_expanded_actions_cache: dict[str, dict[str, Any]] = {} # noqa: CCE003 # global cache | ||
|
||
def __new__(cls) -> Self: | ||
if cls._instance is None: | ||
cls._instance = super().__new__(cls) | ||
|
||
return cls._instance | ||
|
||
def extend(self, vertex_data: dict[str, Any]) -> dict[str, Any]: | ||
if not vertex_data[CustomAttributes.RESOURCE_TYPE] in SUPPORTED_IAM_BLOCKS: | ||
return vertex_data | ||
|
||
cache_key = f"{vertex_data[CustomAttributes.FILE_PATH]}:{vertex_data[CustomAttributes.RESOURCE_TYPE]}:{vertex_data[CustomAttributes.BLOCK_NAME]}" | ||
if cache_key in IamActionExpansion.iam_expanded_actions_cache: | ||
return IamActionExpansion.iam_expanded_actions_cache[cache_key] | ||
|
||
expanded_actions = self._expand_iam_actions(vertex_data=vertex_data) | ||
IamActionExpansion.iam_expanded_actions_cache[cache_key] = expanded_actions | ||
return expanded_actions | ||
|
||
def _expand_iam_actions(self, vertex_data: dict[str, Any]) -> dict[str, Any]: | ||
"""Returns resource data with the expanded actions of an IAM statement | ||
Info: Only AWS Terraform resources are supported for now | ||
""" | ||
|
||
vertex_data = pickle_deepcopy(vertex_data) | ||
resource_type = vertex_data[CustomAttributes.RESOURCE_TYPE] | ||
if resource_type == "data.aws_iam_policy_document": | ||
self._adjust_action_value(policy=vertex_data, statement_key="statement", action_key="actions") | ||
elif resource_type in IAM_POLICY_BLOCKS: | ||
policy = vertex_data.get("policy") | ||
if isinstance(policy, dict): | ||
self._adjust_action_value(policy=policy, statement_key="Statement", action_key="Action") | ||
elif resource_type == "aws_ssoadmin_permission_set_inline_policy": | ||
policy = vertex_data.get("inline_policy") | ||
if isinstance(policy, dict): | ||
self._adjust_action_value(policy=policy, statement_key="Statement", action_key="Action") | ||
|
||
return vertex_data | ||
|
||
def _adjust_action_value(self, policy: dict[str, Any], statement_key: str, action_key: str) -> None: | ||
for statement in force_list(policy.get(statement_key, [])): | ||
if action_key in statement: | ||
original_actions = statement[action_key] | ||
expanded_actions = expand(action=original_actions) | ||
if isinstance(original_actions, list): | ||
statement[action_key].extend(expanded_actions) | ||
else: | ||
expanded_actions = list(expanded_actions) # fix in policy_sentry to be a list not a set | ||
expanded_actions.append(original_actions) | ||
statement[action_key] = expanded_actions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from typing import Any | ||
|
||
from typing_extensions import Self | ||
|
||
from checkov.common.checks_infra.extensions.iam_action_expansion import IamActionExpansion | ||
from checkov.common.models.enums import GraphCheckExtension | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class GraphCheckExtensionsRegistry: | ||
_instance = None # noqa: CCE003 # singleton | ||
|
||
def __new__(cls) -> Self: | ||
if cls._instance is None: | ||
cls._instance = super().__new__(cls) | ||
cls.extensions = { | ||
IamActionExpansion.name: IamActionExpansion(), | ||
} | ||
|
||
return cls._instance | ||
|
||
def run(self, extensions: list[GraphCheckExtension], vertex_data: dict[str, Any]) -> dict[str, Any]: | ||
for extension in extensions: | ||
if extension not in self.extensions: | ||
logger.info(f"Extension {extension} doesn't exist") | ||
continue | ||
|
||
vertex_data = self.extensions[extension].extend(vertex_data=vertex_data) | ||
|
||
return vertex_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
14 changes: 14 additions & 0 deletions
14
checkov/common/graph/checks_infra/extensions/base_extension.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from __future__ import annotations | ||
|
||
from abc import ABC, abstractmethod | ||
from typing import Any | ||
|
||
from checkov.common.models.enums import GraphCheckExtension | ||
|
||
|
||
class BaseGraphCheckExtension(ABC): | ||
name: GraphCheckExtension | ||
|
||
@abstractmethod | ||
def extend(self, vertex_data: dict[str, Any]) -> dict[str, Any]: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.