From b5c828b4e9201674e4e3a2740fdbaefcf3cf6bd8 Mon Sep 17 00:00:00 2001 From: Max Amelchenko Date: Wed, 28 Aug 2024 13:32:53 +0300 Subject: [PATCH] add support for provider in tf_plan framework --- checkov/terraform/checks/provider/base_check.py | 3 ++- checkov/terraform/checks/provider/registry.py | 2 ++ checkov/terraform/plan_parser.py | 12 ++++++++++-- checkov/terraform/plan_runner.py | 13 +++++++------ checkov/terraform/plan_utils.py | 2 +- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/checkov/terraform/checks/provider/base_check.py b/checkov/terraform/checks/provider/base_check.py index a288fe172d8..65a9e4f9bd0 100644 --- a/checkov/terraform/checks/provider/base_check.py +++ b/checkov/terraform/checks/provider/base_check.py @@ -4,7 +4,7 @@ from checkov.common.checks.base_check import BaseCheck from checkov.common.models.enums import CheckCategories, CheckResult -from checkov.terraform.checks.provider.registry import provider_registry +from checkov.terraform.checks.provider.registry import provider_registry, plan_provider_registry class BaseProviderCheck(BaseCheck): @@ -26,6 +26,7 @@ def __init__( ) self.supported_provider = supported_provider provider_registry.register(self) + plan_provider_registry.register(self) def scan_entity_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: return self.scan_provider_conf(conf) diff --git a/checkov/terraform/checks/provider/registry.py b/checkov/terraform/checks/provider/registry.py index 0d4f5084700..031e98d00bd 100644 --- a/checkov/terraform/checks/provider/registry.py +++ b/checkov/terraform/checks/provider/registry.py @@ -1,4 +1,6 @@ from checkov.common.bridgecrew.check_type import CheckType from checkov.terraform.checks.provider.base_registry import Registry +from checkov.terraform.checks.provider.plan_registry import Registry as PlanRegistry provider_registry = Registry(CheckType.TERRAFORM) +plan_provider_registry = PlanRegistry(CheckType.TERRAFORM_PLAN) diff --git a/checkov/terraform/plan_parser.py b/checkov/terraform/plan_parser.py index 7f2b103087d..efaf6ade052 100644 --- a/checkov/terraform/plan_parser.py +++ b/checkov/terraform/plan_parser.py @@ -8,7 +8,7 @@ from checkov.common.graph.graph_builder import CustomAttributes from checkov.common.parsers.node import ListNode -from checkov.common.util.consts import LINE_FIELD_NAMES, TRUE_AFTER_UNKNOWN +from checkov.common.util.consts import LINE_FIELD_NAMES, TRUE_AFTER_UNKNOWN, START_LINE, END_LINE from checkov.common.util.type_forcers import force_list from checkov.terraform.context_parsers.tf_plan import parse @@ -318,12 +318,20 @@ def _get_provider(template: dict[str, dict[str, Any]]) -> dict[str, dict[str, An # Not a provider, skip continue provider_map[provider_key] = {} + provider_alias = provider_data.get("alias", "default") + provider_map[provider_key][provider_alias] = {} + provider_map_entry = provider_map[provider_key][provider_alias] for field, value in provider_data.get('expressions', {}).items(): if field in LINE_FIELD_NAMES or not isinstance(value, dict): continue # don't care about line #s or non dicts expression_value = value.get('constant_value', None) if expression_value: - provider_map[provider_key][field] = expression_value + if isinstance(expression_value, str): + expression_value = [expression_value] + provider_map_entry[field] = expression_value + provider_map_entry['start_line'] = [provider_data.get(START_LINE, 1) - 1] + provider_map_entry['end_line'] = [provider_data.get(END_LINE, 1)] + provider_map_entry[TF_PLAN_RESOURCE_ADDRESS] = f"{provider_key}.{provider_alias}" return provider_map diff --git a/checkov/terraform/plan_runner.py b/checkov/terraform/plan_runner.py index e666ccc8ce7..6e48262e286 100644 --- a/checkov/terraform/plan_runner.py +++ b/checkov/terraform/plan_runner.py @@ -26,11 +26,13 @@ from checkov.runner_filter import RunnerFilter from checkov.terraform.base_runner import BaseTerraformRunner from checkov.terraform.checks.data.registry import data_registry +from checkov.terraform.checks.provider.registry import plan_provider_registry from checkov.terraform.checks.resource.registry import resource_registry from checkov.terraform.context_parsers.registry import parser_registry from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_ADDRESS from checkov.terraform.plan_utils import create_definitions, build_definitions_context from checkov.terraform.deep_analysis_plan_graph_manager import DeepAnalysisGraphManager +from common.util import data_structures_utils _TerraformPlanContext: TypeAlias = "dict[str, dict[str, Any]]" _TerraformPlanDefinitions: TypeAlias = "dict[str, dict[str, Any]]" @@ -95,6 +97,7 @@ def __init__(self, graph_class: Type[TerraformLocalGraph] = TerraformLocalGraph, block_type_registries = { # noqa: CCE003 # a static attribute 'resource': resource_registry, 'data': data_registry, + 'provider': plan_provider_registry } def run( @@ -199,8 +202,8 @@ def check_tf_definition( logging.debug(f"Scanning file: {scanned_file}") for block_type in definition.keys(): if block_type in self.block_type_registries.keys(): - self.run_block(definition[block_type], None, full_file_path, root_folder, report, scanned_file, - block_type, runner_filter) + self.run_block(definition[block_type], self.context, full_file_path, root_folder, + report, scanned_file, block_type, runner_filter) @staticmethod def _get_file_path(full_file_path: TFDefinitionKeyType, root_folder: str | pathlib.Path) -> tuple[str, str]: @@ -237,8 +240,8 @@ def run_block( entity_context = self.get_entity_context(definition_path, full_file_path, entity) entity_lines_range = [entity_context.get('start_line', 1), entity_context.get('end_line', 1)] entity_code_lines = entity_context.get('code_lines', []) - entity_address = entity_context['address'] _, _, entity_config = registry.extract_entity_details(entity) + entity_address = entity_context.get('address') or entity_context.get(CustomAttributes.TF_RESOURCE_ADDRESS) self._assign_graph_to_registry(registry) results = registry.scan(scanned_file, entity, [], runner_filter, report_type=CheckType.TERRAFORM_PLAN) @@ -290,9 +293,7 @@ def get_entity_context_and_evaluations(self, entity: dict[str, Any]) -> dict[str raw_context['definition_path'] = entity[CustomAttributes.BLOCK_NAME].split('.') return raw_context - def get_entity_context( - self, definition_path: list[str], full_file_path: str, entity: dict[str, Any] - ) -> dict[str, Any]: + def get_entity_context(self, definition_path: list[str], full_file_path: str, entity: dict[str, Any]) -> dict[str, Any]: if not self.context: return {} diff --git a/checkov/terraform/plan_utils.py b/checkov/terraform/plan_utils.py index d2003662555..cae35104458 100644 --- a/checkov/terraform/plan_utils.py +++ b/checkov/terraform/plan_utils.py @@ -65,7 +65,7 @@ def build_definitions_context( definitions_raw: Dict[str, List[Tuple[int, str]]] ) -> Dict[str, Dict[str, Any]]: definitions_context: dict[str, dict[str, Any]] = defaultdict(dict) - supported_block_types = ("data", "resource") + supported_block_types = ("data", "resource", "provider") for full_file_path, definition in definitions.items(): for block_type in supported_block_types: entities = definition.get(block_type, [])