Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/bridgecrewio/checkov into c…
Browse files Browse the repository at this point in the history
…ustom-policies-check-all-frameworks-for-empty-framework
  • Loading branch information
SteveVaknin committed Jul 28, 2024
2 parents 0f07b74 + 73a909b commit 710cb6b
Show file tree
Hide file tree
Showing 44 changed files with 290 additions and 100 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
id: changed-files-specific
uses: tj-actions/changed-files@eaf854ef0c266753e1abec356dcf17d92695b251 # v44
with:
files: tests/cloudformation/checks/resource/aws/**/*
files: tests/cloudformation/checks/resource/aws/example*/**/*
- name: Install cfn-lint
if: steps.changed-files-specific.outputs.any_changed == 'true'
run: |
Expand Down
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
# CHANGELOG

## [Unreleased](https://github.com/bridgecrewio/checkov/compare/3.2.204...HEAD)
## [Unreleased](https://github.com/bridgecrewio/checkov/compare/3.2.208...HEAD)

## [3.2.208](https://github.com/bridgecrewio/checkov/compare/3.2.204...3.2.208) - 2024-07-25

### Feature

- **general:** filter resource by provider for all resources types - [#6598](https://github.com/bridgecrewio/checkov/pull/6598)
- **secrets:** add CKV_SECRET_192 to GENERIC_PRIVATE_KEY_CHECK_IDS - [#6600](https://github.com/bridgecrewio/checkov/pull/6600)
- **terraform:** Update ckv-aws-8 policy - support unknown statement - [#6596](https://github.com/bridgecrewio/checkov/pull/6596)

### Bug Fix

- **terraform:** Fix resource type for CKV_AZURE_242 - [#6599](https://github.com/bridgecrewio/checkov/pull/6599)

### Platform

- **general:** handle multiple values for the same metadata filter - [#6604](https://github.com/bridgecrewio/checkov/pull/6604)

## [3.2.204](https://github.com/bridgecrewio/checkov/compare/3.2.201...3.2.204) - 2024-07-24

Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,14 @@ checkov -d . --skip-check kube-system
Run a scan of a container image. First pull or build the image then refer to it by the hash, ID, or name:tag:
```sh
checkov --framework sca_image --docker-image sha256:1234example --dockerfile-path /Users/path/to/Dockerfile --bc-api-key ...
checkov --framework sca_image --docker-image sha256:1234example --dockerfile-path /Users/path/to/Dockerfile --repo-id ... --bc-api-key ...
checkov --docker-image <image-name>:tag --dockerfile-path /User/path/to/Dockerfile --bc-api-key ...
checkov --docker-image <image-name>:tag --dockerfile-path /User/path/to/Dockerfile --repo-id ... --bc-api-key ...
```
You can use --image flag also to scan container image instead of --docker-image for shortener:
```sh
checkov --image <image-name>:tag --dockerfile-path /User/path/to/Dockerfile --bc-api-key ...
checkov --image <image-name>:tag --dockerfile-path /User/path/to/Dockerfile --repo-id ... --bc-api-key ...
```
Run an SCA scan of packages in a repo:
Expand All @@ -278,12 +278,12 @@ checkov -d .
Run secrets scanning on all files in MyDirectory. Skip CKV_SECRET_6 check on json files that their suffix is DontScan
```sh
checkov -d /MyDirectory --framework secrets --bc-api-key ... --skip-check CKV_SECRET_6:.*DontScan.json$
checkov -d /MyDirectory --framework secrets --repo-id ... --bc-api-key ... --skip-check CKV_SECRET_6:.*DontScan.json$
```
Run secrets scanning on all files in MyDirectory. Skip CKV_SECRET_6 check on json files that contains "skip_test" in path
```sh
checkov -d /MyDirectory --framework secrets --bc-api-key ... --skip-check CKV_SECRET_6:.*skip_test.*json$
checkov -d /MyDirectory --framework secrets --repo-id ... --bc-api-key ... --skip-check CKV_SECRET_6:.*skip_test.*json$
```
One can mask values from scanning results by supplying a configuration file (using --config-file flag) with mask entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class AzureMLWorkspacePrivateEndpoint(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure Azure Machine learning workspace is not configured with private endpoint"
name = "Ensure Azure Machine learning workspace is configured with private endpoint"
id = "CKV_AZURE_243"
supported_resources = ["Microsoft.MachineLearningServices/workspaces"]
categories = [CheckCategories.NETWORKING]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import fnmatch
import json
import logging
from abc import abstractmethod
Expand Down Expand Up @@ -66,6 +67,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
if statement_key in converted_policy_doc:
policy_statement = PolicyDocument(converted_policy_doc)
self.policy_document_cache[self.entity_path][policy.get("PolicyName")] = policy_statement
self.cloudsplaining_enrich_evaluated_keys(policy_statement)
violations = self.cloudsplaining_analysis(policy_statement)
if violations:
logging.debug(f"detailed cloudsplaining finding: {json.dumps(violations)}")
Expand All @@ -79,3 +81,27 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
@abstractmethod
def cloudsplaining_analysis(self, policy: PolicyDocument) -> list[str]:
raise NotImplementedError()

def cloudsplaining_enrich_evaluated_keys(self, policy: PolicyDocument) -> None:
try:
violating_actions = self.cloudsplaining_analysis(policy)
if violating_actions:
# in case we have violating actions for this policy we start looking for it through the statements
for stmt_idx, statement in enumerate(policy.statements):
actions = statement.statement.get('Action') # get the actions for this statement
if actions:
if isinstance(actions, str):
for violating_action in violating_actions:
if fnmatch.fnmatch(violating_action, actions): # found the violating action in our list of actions
self.evaluated_keys = [f"Properties/PolicyDocument/Statement/[{stmt_idx}]/Action"]
break
if isinstance(actions, list):
for action_idx, action in enumerate(actions): # go through the actions of this statement and try to match one violation
for violating_action in violating_actions:
if fnmatch.fnmatch(violating_action, action): # found the violating action in our list of actions
self.evaluated_keys.append(
f"Properties/PolicyDocument/Statement/[{stmt_idx}]/Action/[{action_idx}]/"
)
break
except Exception as e:
logging.warning(f'Failed enriching cloudsplaining evaluated keys due to: {e}')
14 changes: 10 additions & 4 deletions checkov/cloudformation/checks/resource/aws/ECRPolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
if "Statement" in policy_text.keys() and isinstance(policy_text["Statement"], list):
for statement_index, statement in enumerate(policy_text["Statement"]):
if "Principal" in statement.keys():
for principal_index, principal in enumerate(statement["Principal"]):
principal_block = statement["Principal"]
evaluated_key = f"Properties/RepositoryPolicyText/Statement/[{statement_index}]/Principal"
if isinstance(principal_block, dict) and 'AWS' in principal_block:
principal_block = principal_block['AWS']
evaluated_key += "/AWS"
for principal_index, principal in enumerate(principal_block):
if principal == "*" and not self.check_for_constrained_condition(statement):
self.evaluated_keys = [
f"Properties/RepositoryPolicyText/Statement/[{statement_index}]/Principal/[{principal_index}]"
]
if isinstance(principal_block, list):
self.evaluated_keys = [f"{evaluated_key}/[{principal_index}]/"]
else:
self.evaluated_keys = [f"{evaluated_key}"]
return CheckResult.FAILED
return CheckResult.PASSED

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ def __init__(self):
super().__init__(name=name, id=id)

def cloudsplaining_analysis(self, policy):
escalation = policy.allows_privilege_escalation
return escalation
escalations = policy.allows_privilege_escalation
flattened_escalations: list[str] = []
if escalations:
for escalation in escalations:
if isinstance(escalation, dict):
flattened_escalations.extend(escalation.get('actions'))
else:
flattened_escalations.append(escalation)
return flattened_escalations


check = cloudsplainingPrivilegeEscalation()
26 changes: 19 additions & 7 deletions checkov/common/bridgecrew/platform_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
REQUEST_READ_TIMEOUT,
REQUEST_RETRIES,
)
from checkov.common.util.type_forcers import convert_prisma_policy_filter_to_dict, convert_str_to_bool
from checkov.common.util.type_forcers import convert_prisma_policy_filter_to_params, convert_str_to_bool
from checkov.version import version as checkov_version

if TYPE_CHECKING:
Expand Down Expand Up @@ -1165,16 +1165,17 @@ def get_prisma_policies_for_filter(self, policy_filter: str) -> dict[Any, Any] |
return filtered_policies

logging.debug(f'Prisma policy URL: {self.prisma_policies_url}')
query_params = convert_prisma_policy_filter_to_dict(policy_filter)
query_params = convert_prisma_policy_filter_to_params(policy_filter)
if self.is_valid_policy_filter(query_params, valid_filters=self.get_prisma_policy_filters()):
# If enabled and subtype are not explicitly set, use the only acceptable values.
query_params['policy.enabled'] = True
query_params['policy.subtype'] = 'build'
self.add_static_policy_filters(query_params)
logging.debug(f'Filter query params: {query_params}')

request = self.http.request( # type:ignore[no-untyped-call]
"GET",
self.prisma_policies_url,
headers=headers,
fields=query_params,
fields=tuple(query_params),
)
logging.debug("Got Prisma build policy metadata")
filtered_policies = json.loads(request.data.decode("utf8"))
Expand All @@ -1184,6 +1185,17 @@ def get_prisma_policies_for_filter(self, policy_filter: str) -> dict[Any, Any] |
f"Failed to get prisma build policy metadata from {self.prisma_policies_url}{response_message}", exc_info=True)
return filtered_policies

@staticmethod
def add_static_policy_filters(query_params: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""
Adds policy.enabled = true, policy.subtype = build to the query params, if these are not already present. Modifies the list in place and also returns it.
"""
if not any(p[0] == 'policy.enabled' for p in query_params):
query_params.append(('policy.enabled', 'true'))
if not any(p[0] == 'policy.subtype' for p in query_params):
query_params.append(('policy.subtype', 'build'))
return query_params

def get_prisma_policy_filters(self) -> Dict[str, Dict[str, Any]]:
request = None
try:
Expand Down Expand Up @@ -1211,7 +1223,7 @@ def get_prisma_policy_filters(self) -> Dict[str, Dict[str, Any]]:
return {}

@staticmethod
def is_valid_policy_filter(policy_filter: dict[str, str], valid_filters: dict[str, dict[str, Any]] | None = None) -> bool:
def is_valid_policy_filter(policy_filter: list[tuple[str, str]], valid_filters: dict[str, dict[str, Any]] | None = None) -> bool:
"""
Validates only the filter names
"""
Expand All @@ -1221,7 +1233,7 @@ def is_valid_policy_filter(policy_filter: dict[str, str], valid_filters: dict[st
return False
if not valid_filters:
return False
for filter_name, filter_value in policy_filter.items():
for filter_name, filter_value in policy_filter:
if filter_name not in valid_filters.keys():
logging.warning(f"Invalid filter name: {filter_name}")
logging.warning(f"Available filter names: {', '.join(valid_filters.keys())}")
Expand Down
1 change: 1 addition & 0 deletions checkov/common/checks_infra/checks_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def parse_raw_check(self, raw_check: Dict[str, Dict[str, Any]], **kwargs: Any) -
check.guideline = raw_check.get("metadata", {}).get("guideline")
check.check_path = kwargs.get("check_path", "")
solver = self.get_check_solver(check)
solver.providers = providers
check.set_solver(solver)

return check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List

if isinstance(graph_connector, DiGraph):
for _, data in graph_connector.nodes(data=True):
if (not self.resource_types or data.get(CustomAttributes.RESOURCE_TYPE) in self.resource_types) \
if self.resource_type_pred(data, self.resource_types) \
and data.get(CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES:
jobs.append(executer.submit(
self._process_node, data, passed_vertices, failed_vertices, unknown_vertices))
Expand All @@ -60,8 +60,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List
return passed_vertices, failed_vertices, unknown_vertices

for _, data in graph_connector.nodes():
if (not self.resource_types or data.get(CustomAttributes.RESOURCE_TYPE) in self.resource_types) \
and data.get(CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES:
if self.resource_type_pred(data, self.resource_types) and data.get(CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES:
jobs.append(executer.submit(
self._process_node, data, passed_vertices, failed_vertices, unknown_vertices))

Expand Down
39 changes: 32 additions & 7 deletions checkov/common/graph/checks_infra/solvers/base_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
from typing import Tuple, List, Dict, Any, TYPE_CHECKING

from checkov.common.graph.checks_infra.enums import SolverType
from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.util.env_vars_config import env_vars_config

if TYPE_CHECKING:
from networkx import DiGraph

# Based on the resource names in iac frameworks
AWS_KEYS = ['aws_', 'AWS::', 'aws-']
GCP_KEYS = ['gcloud', 'google_']
AZURE_KEYS = ['azurerm_', 'Microsoft.']


class BaseSolver:
operator = "" # noqa: CCE003 # a static attribute

def __init__(self, solver_type: SolverType) -> None:
self.solver_type = solver_type
self.providers: List[str] = []

@abstractmethod
def get_operation(self, *args: Any, **kwargs: Any) -> Any:
Expand All @@ -28,12 +35,30 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
raise NotImplementedError()

@staticmethod
def resource_type_pred(v: Dict[str, Any], resource_types: List[str]) -> bool:
def resource_type_pred(self, v: Dict[str, Any], resource_types: List[str]) -> bool:
resource_type = CustomAttributes.RESOURCE_TYPE
if env_vars_config.CKV_SUPPORT_ALL_RESOURCE_TYPE:
is_all_resources = isinstance(resource_types, list) and resource_types[0].lower() == "all"
support_all_resources = bool("resource_type" in v and is_all_resources and v[
"resource_type"] != 'module')
else:
support_all_resources = False
return not resource_types or ("resource_type" in v and v["resource_type"] in resource_types) or support_all_resources
resource_type_match_provider = self.resource_match_provider(v.get(resource_type, ''))
support_all_resources = bool(resource_type in v and is_all_resources and v.get(resource_type) != 'module' and resource_type_match_provider)

return not resource_types or support_all_resources

return not resource_types or (resource_type in v and v[resource_type] in resource_types)

def resource_match_provider(self, resource_type: str) -> bool:
if not self.providers:
return True
for provider in self.providers:
if provider.lower() == 'aws':
if any(resource_type.startswith(key) for key in AWS_KEYS):
return True
elif provider.lower() == 'gcp':
if any(resource_type.startswith(key) for key in GCP_KEYS):
return True
elif provider.lower() == 'azure':
if any(resource_type.startswith(key) for key in AZURE_KEYS):
return True
else: # if we don't have a provider or the provider was not one of the basic providers
return True
return False
4 changes: 2 additions & 2 deletions checkov/common/util/ext_argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def add_parser_args(self) -> None:
"When used with --policy-metadata-filter-exception, the exceptions override any policies selected as"
"a result of the --policy-metadata-filter flag."
"See https://prisma.pan.dev/api/cloud/cspm/policy#operation/get-policy-filters-and-options for "
"information on allowed filters. Format: policy.label=test,cloud.type=aws ",
"information on allowed filters. Example: policy.label=label1,policy.label=label2,cloud.type=aws",
default=None,
)
self.add(
Expand All @@ -478,7 +478,7 @@ def add_parser_args(self) -> None:
"When used with --policy-metadata-filter, the exceptions override any policies selected as"
"a result of the --policy-metadata-filter flag."
"See https://prisma.pan.dev/api/cloud/cspm/policy#operation/get-policy-filters-and-options for "
"information on allowed filters. Format: policy.label=test,cloud.type=aws ",
"information on allowed filters. Example: policy.label=label1,policy.label=label2,cloud.type=aws",
default=None,
)
self.add(
Expand Down
22 changes: 14 additions & 8 deletions checkov/common/util/type_forcers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import typing
from json import JSONDecodeError
from typing import TypeVar, overload, Any, Dict
from typing import TypeVar, overload, Any, Tuple, List

import yaml

Expand Down Expand Up @@ -130,20 +130,26 @@ def convert_csv_string_arg_to_list(csv_string_arg: list[str] | str | None) -> li
return csv_string_arg


def convert_prisma_policy_filter_to_dict(filter_string: str) -> Dict[Any, Any]:
def convert_prisma_policy_filter_to_params(filter_string: str) -> List[Tuple[str, str]]:
"""
Converts the filter string to a dict. For example:
Converts the filter string to a list of tuples. For example:
'policy.label=label,cloud.type=aws' becomes -->
{'policy.label': 'label1', 'cloud.type': 'aws'}
Note that the API does not accept lists https://prisma.pan.dev/api/cloud/cspm/policy#operation/get-policies-v2
This is not allowed: policy.label=label1,label2
[('policy.label', 'label1'), ('cloud.type', 'aws')]
Multiple values for the same attribute, like policy.label, will be separate items in the tuple. For example,
'policy.label=label,policy.label=anotherlabel' becomes -->
[('policy.label', 'label1'), ('policy.label', 'anotherlabel')]
Note that the urllib3 library seems to work best with tuples only (not lists), so this result may need to be converted.
It is returned as a list so that it can be modified separately, and converted to a tuple only when ready
"""
filter_params = {}
filter_params: List[Tuple[str, str]] = []
if isinstance(filter_string, str) and filter_string:
for f in filter_string.split(','):
try:
f_name, f_value = f.split('=')
filter_params[f_name.strip()] = f_value.strip()
filter_params.append((f_name.strip(), f_value.strip()))
except (IndexError, ValueError) as e:
logging.debug(f"Invalid filter format: {e}")

return filter_params
Loading

0 comments on commit 710cb6b

Please sign in to comment.