Skip to content

Commit

Permalink
Merge branch 'fix/issue-6686-unrecognised-sg-in-docdbelastic' of gith…
Browse files Browse the repository at this point in the history
…ub.com:bhean/checkov into fix/issue-6686-unrecognised-sg-in-docdbelastic
  • Loading branch information
bhean committed Aug 27, 2024
2 parents 4faa1b8 + 9de662c commit a0a1b20
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 13 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# CHANGELOG

## [Unreleased](https://github.com/bridgecrewio/checkov/compare/3.2.235...HEAD)
## [Unreleased](https://github.com/bridgecrewio/checkov/compare/3.2.236...HEAD)

## [3.2.236](https://github.com/bridgecrewio/checkov/compare/3.2.235...3.2.236) - 2024-08-26

- no noteworthy changes

## [3.2.235](https://github.com/bridgecrewio/checkov/compare/3.2.234...3.2.235) - 2024-08-21

Expand Down
6 changes: 3 additions & 3 deletions checkov/secrets/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def run(
for suppression in suppressions if suppression['suppressionType'] == 'SecretsPolicy']
if policies_list:
runnable_plugins: dict[str, str] = get_runnable_plugins(policies_list)
logging.info(f"Found {len(runnable_plugins)} runnable plugins")
logging.debug(f"Found {len(runnable_plugins)} runnable plugins")
if len(runnable_plugins) > 0:
plugins_index += 1
for name, runnable_plugin in runnable_plugins.items():
Expand All @@ -164,7 +164,7 @@ def run(
'path': f'file://{work_path}/runnable_plugin_{plugins_index}.py'
})
plugins_index += 1
logging.info(f"Loaded runnable plugin {name}")
logging.debug(f"Loaded runnable plugin {name}")
# load internal regex detectors
detector_path = f"{current_dir}/plugins/custom_regex_detector.py"
logging.info(f"Custom detector found at {detector_path}. Loading...")
Expand Down Expand Up @@ -390,7 +390,7 @@ def _safe_scan(file_path: str, base_path: str) -> tuple[str, list[PotentialSecre
try:
start_time = datetime.datetime.now()
file_results = [*scan.scan_file(full_file_path)]
logging.info(f'file {full_file_path} results len {len(file_results)}')
logging.debug(f'file {full_file_path} results len {len(file_results)}')
end_time = datetime.datetime.now()
run_time = end_time - start_time
if run_time > datetime.timedelta(seconds=10):
Expand Down
10 changes: 9 additions & 1 deletion checkov/terraform/checks/data/aws/IAMPrivilegeEscalation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ def __init__(self) -> None:
super().__init__(name=name, id=id)

def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]:
return policy.allows_privilege_escalation
escalations = policy.allows_privilege_escalation
flattened_escalations: list[str] = []
if escalations:
for escalation in escalations:
if isinstance(escalation, dict):
flattened_escalations.extend(escalation.get('actions'))
else:
flattened_escalations.append(escalation)
return flattened_escalations


check = CloudSplainingPrivilegeEscalation()
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import fnmatch
import logging
from abc import ABC
from typing import Dict, List, Any
from typing import Dict, List, Any, Union

from cloudsplaining.scan.policy_document import PolicyDocument

Expand Down Expand Up @@ -28,3 +30,24 @@ def should_scan_conf(self, conf: Dict[str, List[Any]]) -> bool:
def convert_to_iam_policy(self, conf: Dict[str, List[Any]]) -> PolicyDocument:
converted_conf = convert_terraform_conf_to_iam_policy(conf)
return PolicyDocument(converted_conf)

def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument,
violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None:
try:
# in case we have violating actions for this policy we start looking for it through the statements
for stmt_idx, statement in enumerate(policy.statements):
actions = statement.statement.get('Action') # get the actions for this statement
if actions:
if isinstance(actions, str):
for violating_action in violating_actions:
if fnmatch.fnmatch(violating_action.lower(), actions.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"statement/[{stmt_idx}]/actions")
return
if isinstance(actions, list):
for action in actions: # go through the actions of this statement and try to match one violation
for violating_action in violating_actions:
if isinstance(action, str) and fnmatch.fnmatch(violating_action.lower(), action.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"statement/[{stmt_idx}]/actions")
return
except Exception as e:
logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}')
10 changes: 9 additions & 1 deletion checkov/terraform/checks/resource/aws/IAMPrivilegeEscalation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ def __init__(self) -> None:
super().__init__(name=name, id=id)

def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]:
return policy.allows_privilege_escalation
escalations = policy.allows_privilege_escalation
flattened_escalations: list[str] = []
if escalations:
for escalation in escalations:
if isinstance(escalation, dict):
flattened_escalations.extend(escalation.get('actions'))
else:
flattened_escalations.append(escalation)
return flattened_escalations


check = ResourceCloudSplainingPrivilegeEscalation()
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import fnmatch
import logging
from abc import ABC
from typing import Dict, List, Any
from typing import Dict, List, Any, Union

from cloudsplaining.scan.policy_document import PolicyDocument

Expand Down Expand Up @@ -41,3 +43,24 @@ def convert_to_iam_policy(self, conf: Dict[str, Any]) -> PolicyDocument:
policy = conf['policy'][0]

return PolicyDocument(policy)

def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument,
violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None:
try:
# in case we have violating actions for this policy we start looking for it through the statements
for stmt_idx, statement in enumerate(policy.statements):
actions = statement.statement.get('Action') # get the actions for this statement
if actions:
if isinstance(actions, str):
for violating_action in violating_actions:
if fnmatch.fnmatch(violating_action.lower(), actions.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"policy/Statement/[{stmt_idx}]/Action")
return
if isinstance(actions, list):
for action in actions: # go through the actions of this statement and try to match one violation
for violating_action in violating_actions:
if isinstance(action, str) and fnmatch.fnmatch(violating_action.lower(), action.lower()): # found the violating action in our list of actions
self.evaluated_keys.append(f"policy/Statement/[{stmt_idx}]/Action")
return
except Exception as e:
logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}')
13 changes: 10 additions & 3 deletions checkov/terraform/checks/utils/base_cloudsplaining_iam_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ def scan_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
if self.cache_key not in BaseTerraformCloudsplainingIAMScanner.policy_document_cache.keys():
policy = self.convert_to_iam_policy(conf)
BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key] = policy
violations = self.cloudsplaining_analysis(
BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key]
)

policy_document: PolicyDocument = BaseTerraformCloudsplainingIAMScanner.policy_document_cache[self.cache_key]
violations = self.cloudsplaining_analysis(policy_document)
if violations and hasattr(self, 'evaluated_keys'):
self.cloudsplaining_enrich_evaluated_keys(policy_document, violations)
except Exception:
# this might occur with templated iam policies where ARN is not in place or similar
logging.debug(f"could not run cloudsplaining analysis on policy {conf}")
Expand All @@ -51,3 +53,8 @@ def convert_to_iam_policy(self, conf: Dict[str, List[Any]]) -> PolicyDocument:
@abstractmethod
def cloudsplaining_analysis(self, policy: PolicyDocument) -> Union[List[str], List[Dict[str, Any]]]:
raise NotImplementedError()

@abstractmethod
def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument,
violating_actions: Union[List[str], List[Dict[str, Any]]]) -> None:
raise NotImplementedError()
2 changes: 1 addition & 1 deletion checkov/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '3.2.236'
version = '3.2.238'
2 changes: 1 addition & 1 deletion kubernetes/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
checkov==3.2.236
checkov==3.2.238
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(['statement/[0]/actions'], report.failed_checks[0].check_result.get('evaluated_keys'))

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down
2 changes: 2 additions & 0 deletions tests/terraform/checks/resource/aws/test_IAMWriteAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test(self):
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(report.failed_checks[0].check_result.get('evaluated_keys'), ['policy/Statement/[0]/Action'])

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

Expand Down

0 comments on commit a0a1b20

Please sign in to comment.