Skip to content

Commit

Permalink
feat(terraform): add support for TF cloudsplaining evaluated_keys (#6677
Browse files Browse the repository at this point in the history
)

* TF cloudsplaining

* fix constructed path

* split cloudsplaining between data and resource and add some tests

* fix test lint

* add new tests and fix lowercase actions

* revert resource test file to previous state

* remove list value sorting in TF and fix test

* fix tests

* highlight only actions

* change all tests according to latest logic change

* fix tests

---------

Co-authored-by: Max Amelchenko <[email protected]>
  • Loading branch information
maxamel and Max Amelchenko committed Aug 27, 2024
1 parent 30aa0e1 commit 4e06148
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 7 deletions.
10 changes: 9 additions & 1 deletion checkov/terraform/checks/data/aws/IAMPrivilegeEscalation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ def __init__(self) -> None:
super().__init__(name=name, id=id)

def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]:
return policy.allows_privilege_escalation
escalations = policy.allows_privilege_escalation
flattened_escalations: list[str] = []
if escalations:
for escalation in escalations:
if isinstance(escalation, dict):
flattened_escalations.extend(escalation.get('actions'))
else:
flattened_escalations.append(escalation)
return flattened_escalations


check = CloudSplainingPrivilegeEscalation()
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import fnmatch
import logging
from abc import ABC
from typing import Dict, List, Any
from typing import Dict, List, Any, Union

from cloudsplaining.scan.policy_document import PolicyDocument

Expand Down Expand Up @@ -28,3 +30,24 @@ def should_scan_conf(self, conf: Dict[str, List[Any]]) -> bool:
def convert_to_iam_policy(self, conf: Dict[str, List[Any]]) -> PolicyDocument:
converted_conf = convert_terraform_conf_to_iam_policy(conf)
return PolicyDocument(converted_conf)

def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument,
violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None:
try:
# in case we have violating actions for this policy we start looking for it through the statements
for stmt_idx, statement in enumerate(policy.statements):
actions = statement.statement.get('Action') # get the actions for this statement
if actions:
if isinstance(actions, str):
for violating_action in violating_actions:
if fnmatch.fnmatch(violating_action.lower(), actions.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"statement/[{stmt_idx}]/actions")
return
if isinstance(actions, list):
for action in actions: # go through the actions of this statement and try to match one violation
for violating_action in violating_actions:
if isinstance(action, str) and fnmatch.fnmatch(violating_action.lower(), action.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"statement/[{stmt_idx}]/actions")
return
except Exception as e:
logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}')
10 changes: 9 additions & 1 deletion checkov/terraform/checks/resource/aws/IAMPrivilegeEscalation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ def __init__(self) -> None:
super().__init__(name=name, id=id)

def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]:
return policy.allows_privilege_escalation
escalations = policy.allows_privilege_escalation
flattened_escalations: list[str] = []
if escalations:
for escalation in escalations:
if isinstance(escalation, dict):
flattened_escalations.extend(escalation.get('actions'))
else:
flattened_escalations.append(escalation)
return flattened_escalations


check = ResourceCloudSplainingPrivilegeEscalation()
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import fnmatch
import logging
from abc import ABC
from typing import Dict, List, Any
from typing import Dict, List, Any, Union

from cloudsplaining.scan.policy_document import PolicyDocument

Expand Down Expand Up @@ -41,3 +43,24 @@ def convert_to_iam_policy(self, conf: Dict[str, Any]) -> PolicyDocument:
policy = conf['policy'][0]

return PolicyDocument(policy)

def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument,
violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None:
try:
# in case we have violating actions for this policy we start looking for it through the statements
for stmt_idx, statement in enumerate(policy.statements):
actions = statement.statement.get('Action') # get the actions for this statement
if actions:
if isinstance(actions, str):
for violating_action in violating_actions:
if fnmatch.fnmatch(violating_action.lower(), actions.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"policy/Statement/[{stmt_idx}]/Action")
return
if isinstance(actions, list):
for action in actions: # go through the actions of this statement and try to match one violation
for violating_action in violating_actions:
if isinstance(action, str) and fnmatch.fnmatch(violating_action.lower(), action.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"policy/Statement/[{stmt_idx}]/Action")
return
except Exception as e:
logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}')
13 changes: 10 additions & 3 deletions checkov/terraform/checks/utils/base_cloudsplaining_iam_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ def scan_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
if self.cache_key not in BaseTerraformCloudsplainingIAMScanner.policy_document_cache.keys():
policy = self.convert_to_iam_policy(conf)
BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key] = policy
violations = self.cloudsplaining_analysis(
BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key]
)

policy_document: PolicyDocument = BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key]
violations = self.cloudsplaining_analysis(policy_document)
if violations and hasattr(self, 'evaluated_keys'):
self.cloudsplaining_enrich_evaluated_keys(policy_document, violations)
except Exception:
# this might occur with templated iam policies where ARN is not in place or similar
logging.debug(f"could not run cloudsplaining analysis on policy {conf}")
Expand All @@ -51,3 +53,8 @@ def convert_to_iam_policy(self, conf: Dict[str, List[Any]]) -> PolicyDocument:
@abstractmethod
def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]:
raise NotImplementedError()

@abstractmethod
def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument,
violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None:
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
2 changes: 2 additions & 0 deletions tests/terraform/checks/resource/aws/test_IAMWriteAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down

0 comments on commit 4e06148

Please sign in to comment.