From 4e06148b317489d9a0a50b4ab99c91d506ee38c2 Mon Sep 17 00:00:00 2001 From: Max Amelchenko Date: Tue, 27 Aug 2024 14:01:32 +0300 Subject: [PATCH] feat(terraform): add support for TF cloudsplaining evaluated_keys (#6677) * TF cloudsplaining * fix constructed path * split cloudsplaining between data and resource and add some tests * fix test lint * add new tests and fix lowercase actions * revert resource test file to previous state * remove list value sorting in TF and fix test * fix tests * highlight only actions * change all tests according to latest logic change * fix tests --------- Co-authored-by: Max Amelchenko --- .../checks/data/aws/IAMPrivilegeEscalation.py | 10 +++++++- .../base_cloudsplaining_data_iam_check.py | 25 ++++++++++++++++++- .../resource/aws/IAMPrivilegeEscalation.py | 10 +++++++- .../base_cloudsplaining_resource_iam_check.py | 25 ++++++++++++++++++- .../utils/base_cloudsplaining_iam_scanner.py | 13 +++++++--- .../test_CloudSplainingCredentialsExposure.py | 2 ++ .../test_CloudSplainingDataExfiltration.py | 2 ++ .../test_CloudSplainingPrivilegeEscalation.py | 2 ++ .../data/aws/test_CloudsplainingIAMWrite.py | 2 ++ ...est_CloudsplainingPermissionsManagement.py | 2 ++ .../aws/test_IAMCredentialsExposure.py | 2 ++ .../resource/aws/test_IAMDataExfiltration.py | 2 ++ .../aws/test_IAMPermissionsManagement.py | 2 ++ .../aws/test_IAMPrivilegeEscalation.py | 2 ++ .../resource/aws/test_IAMWriteAccess.py | 2 ++ 15 files changed, 96 insertions(+), 7 deletions(-) diff --git a/checkov/terraform/checks/data/aws/IAMPrivilegeEscalation.py b/checkov/terraform/checks/data/aws/IAMPrivilegeEscalation.py index 980b551518a..b75bbaa7964 100644 --- a/checkov/terraform/checks/data/aws/IAMPrivilegeEscalation.py +++ b/checkov/terraform/checks/data/aws/IAMPrivilegeEscalation.py @@ -15,7 +15,15 @@ def __init__(self) -> None: super().__init__(name=name, id=id) def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]: - return policy.allows_privilege_escalation + escalations = policy.allows_privilege_escalation + flattened_escalations: list[str] = [] + if escalations: + for escalation in escalations: + if isinstance(escalation, dict): + flattened_escalations.extend(escalation.get('actions')) + else: + flattened_escalations.append(escalation) + return flattened_escalations check = CloudSplainingPrivilegeEscalation() diff --git a/checkov/terraform/checks/data/base_cloudsplaining_data_iam_check.py b/checkov/terraform/checks/data/base_cloudsplaining_data_iam_check.py index 4bc83d6e996..8f2e43797cb 100644 --- a/checkov/terraform/checks/data/base_cloudsplaining_data_iam_check.py +++ b/checkov/terraform/checks/data/base_cloudsplaining_data_iam_check.py @@ -1,5 +1,7 @@ +import fnmatch +import logging from abc import ABC -from typing import Dict, List, Any +from typing import Dict, List, Any, Union from cloudsplaining.scan.policy_document import PolicyDocument @@ -28,3 +30,24 @@ def should_scan_conf(self, conf: Dict[str, List[Any]]) -> bool: def convert_to_iam_policy(self, conf: Dict[str, List[Any]]) -> PolicyDocument: converted_conf = convert_terraform_conf_to_iam_policy(conf) return PolicyDocument(converted_conf) + + def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument, + violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None: + try: + # in case we have violating actions for this policy we start looking for it through the statements + for stmt_idx, statement in enumerate(policy.statements): + actions = statement.statement.get('Action') # get the actions for this statement + if actions: + if isinstance(actions, str): + for violating_action in violating_actions: + if fnmatch.fnmatch(violating_action.lower(), actions.lower()): # found the violating action in our list of actions + self.evaluated_keys.append(f"statement/[{stmt_idx}]/actions") + return + if isinstance(actions, list): + for action in actions: # go through the actions of this statement and try to match one violation + for violating_action in violating_actions: + if isinstance(action, str) and fnmatch.fnmatch(violating_action.lower(), action.lower()): # found the violating action in our list of actions + self.evaluated_keys.append(f"statement/[{stmt_idx}]/actions") + return + except Exception as e: + logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}') diff --git a/checkov/terraform/checks/resource/aws/IAMPrivilegeEscalation.py b/checkov/terraform/checks/resource/aws/IAMPrivilegeEscalation.py index 080a20866ab..d1d77cb5535 100644 --- a/checkov/terraform/checks/resource/aws/IAMPrivilegeEscalation.py +++ b/checkov/terraform/checks/resource/aws/IAMPrivilegeEscalation.py @@ -16,7 +16,15 @@ def __init__(self) -> None: super().__init__(name=name, id=id) def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]: - return policy.allows_privilege_escalation + escalations = policy.allows_privilege_escalation + flattened_escalations: list[str] = [] + if escalations: + for escalation in escalations: + if isinstance(escalation, dict): + flattened_escalations.extend(escalation.get('actions')) + else: + flattened_escalations.append(escalation) + return flattened_escalations check = ResourceCloudSplainingPrivilegeEscalation() diff --git a/checkov/terraform/checks/resource/base_cloudsplaining_resource_iam_check.py b/checkov/terraform/checks/resource/base_cloudsplaining_resource_iam_check.py index 6279b224828..333a9f67e0a 100644 --- a/checkov/terraform/checks/resource/base_cloudsplaining_resource_iam_check.py +++ b/checkov/terraform/checks/resource/base_cloudsplaining_resource_iam_check.py @@ -1,7 +1,9 @@ from __future__ import annotations +import fnmatch +import logging from abc import ABC -from typing import Dict, List, Any +from typing import Dict, List, Any, Union from cloudsplaining.scan.policy_document import PolicyDocument @@ -41,3 +43,24 @@ def convert_to_iam_policy(self, conf: Dict[str, Any]) -> PolicyDocument: policy = conf['policy'][0] return PolicyDocument(policy) + + def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument, + violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None: + try: + # in case we have violating actions for this policy we start looking for it through the statements + for stmt_idx, statement in enumerate(policy.statements): + actions = statement.statement.get('Action') # get the actions for this statement + if actions: + if isinstance(actions, str): + for violating_action in violating_actions: + if fnmatch.fnmatch(violating_action.lower(), actions.lower()): # found the violating action in our list of actions + self.evaluated_keys.append(f"policy/Statement/[{stmt_idx}]/Action") + return + if isinstance(actions, list): + for action in actions: # go through the actions of this statement and try to match one violation + for violating_action in violating_actions: + if isinstance(action, str) and fnmatch.fnmatch(violating_action.lower(), action.lower()): # found the violating action in our list of actions + self.evaluated_keys.append(f"policy/Statement/[{stmt_idx}]/Action") + return + except Exception as e: + logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}') diff --git a/checkov/terraform/checks/utils/base_cloudsplaining_iam_scanner.py b/checkov/terraform/checks/utils/base_cloudsplaining_iam_scanner.py index fa983d98543..92efc913297 100644 --- a/checkov/terraform/checks/utils/base_cloudsplaining_iam_scanner.py +++ b/checkov/terraform/checks/utils/base_cloudsplaining_iam_scanner.py @@ -23,9 +23,11 @@ def scan_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: if self.cache_key not in BaseTerraformCloudsplainingIAMScanner.policy_document_cache.keys(): policy = self.convert_to_iam_policy(conf) BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key] = policy - violations = self.cloudsplaining_analysis( - BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key] - ) + + policy_document: PolicyDocument = BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key] + violations = self.cloudsplaining_analysis(policy_document) + if violations and hasattr(self, 'evaluated_keys'): + self.cloudsplaining_enrich_evaluated_keys(policy_document, violations) except Exception: # this might occur with templated iam policies where ARN is not in place or similar logging.debug(f"could not run cloudsplaining analysis on policy {conf}") @@ -51,3 +53,8 @@ def convert_to_iam_policy(self, conf: Dict[str, List[Any]]) -> PolicyDocument: @abstractmethod def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]: raise NotImplementedError() + + @abstractmethod + def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument, + violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None: + raise NotImplementedError() diff --git a/tests/terraform/checks/data/aws/test_CloudSplainingCredentialsExposure.py b/tests/terraform/checks/data/aws/test_CloudSplainingCredentialsExposure.py index fb2996a5463..fdfb9df0754 100644 --- a/tests/terraform/checks/data/aws/test_CloudSplainingCredentialsExposure.py +++ b/tests/terraform/checks/data/aws/test_CloudSplainingCredentialsExposure.py @@ -35,6 +35,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys')) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/data/aws/test_CloudSplainingDataExfiltration.py b/tests/terraform/checks/data/aws/test_CloudSplainingDataExfiltration.py index c45e27cac89..1f6bc69ee66 100644 --- a/tests/terraform/checks/data/aws/test_CloudSplainingDataExfiltration.py +++ b/tests/terraform/checks/data/aws/test_CloudSplainingDataExfiltration.py @@ -33,6 +33,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys')) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/data/aws/test_CloudSplainingPrivilegeEscalation.py b/tests/terraform/checks/data/aws/test_CloudSplainingPrivilegeEscalation.py index 00f34b95435..3a1ec435e9f 100644 --- a/tests/terraform/checks/data/aws/test_CloudSplainingPrivilegeEscalation.py +++ b/tests/terraform/checks/data/aws/test_CloudSplainingPrivilegeEscalation.py @@ -33,6 +33,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys')) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/data/aws/test_CloudsplainingIAMWrite.py b/tests/terraform/checks/data/aws/test_CloudsplainingIAMWrite.py index 7ee30d3c57d..e79bcd65feb 100644 --- a/tests/terraform/checks/data/aws/test_CloudsplainingIAMWrite.py +++ b/tests/terraform/checks/data/aws/test_CloudsplainingIAMWrite.py @@ -34,6 +34,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys')) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/data/aws/test_CloudsplainingPermissionsManagement.py b/tests/terraform/checks/data/aws/test_CloudsplainingPermissionsManagement.py index 67c98e5ceab..91943139afe 100644 --- a/tests/terraform/checks/data/aws/test_CloudsplainingPermissionsManagement.py +++ b/tests/terraform/checks/data/aws/test_CloudsplainingPermissionsManagement.py @@ -33,6 +33,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys')) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/resource/aws/test_IAMCredentialsExposure.py b/tests/terraform/checks/resource/aws/test_IAMCredentialsExposure.py index 811903dc608..7b8573f12d3 100644 --- a/tests/terraform/checks/resource/aws/test_IAMCredentialsExposure.py +++ b/tests/terraform/checks/resource/aws/test_IAMCredentialsExposure.py @@ -35,6 +35,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action']) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/resource/aws/test_IAMDataExfiltration.py b/tests/terraform/checks/resource/aws/test_IAMDataExfiltration.py index 9463489d2e0..08a3152984e 100644 --- a/tests/terraform/checks/resource/aws/test_IAMDataExfiltration.py +++ b/tests/terraform/checks/resource/aws/test_IAMDataExfiltration.py @@ -33,6 +33,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action']) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/resource/aws/test_IAMPermissionsManagement.py b/tests/terraform/checks/resource/aws/test_IAMPermissionsManagement.py index 06d59bd1750..8bf2855e068 100644 --- a/tests/terraform/checks/resource/aws/test_IAMPermissionsManagement.py +++ b/tests/terraform/checks/resource/aws/test_IAMPermissionsManagement.py @@ -33,6 +33,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action']) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/resource/aws/test_IAMPrivilegeEscalation.py b/tests/terraform/checks/resource/aws/test_IAMPrivilegeEscalation.py index 0b27a2908ff..d601316e8f1 100644 --- a/tests/terraform/checks/resource/aws/test_IAMPrivilegeEscalation.py +++ b/tests/terraform/checks/resource/aws/test_IAMPrivilegeEscalation.py @@ -32,6 +32,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action']) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources) diff --git a/tests/terraform/checks/resource/aws/test_IAMWriteAccess.py b/tests/terraform/checks/resource/aws/test_IAMWriteAccess.py index 7cd609a8772..41406abd945 100644 --- a/tests/terraform/checks/resource/aws/test_IAMWriteAccess.py +++ b/tests/terraform/checks/resource/aws/test_IAMWriteAccess.py @@ -34,6 +34,8 @@ def test(self): self.assertEqual(summary["skipped"], 0) self.assertEqual(summary["parsing_errors"], 0) + self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action']) + self.assertEqual(passing_resources, passed_check_resources) self.assertEqual(failing_resources, failed_check_resources)