Skip to content

Commit

Permalink
add support for provider in tf_plan framework
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Amelchenko committed Aug 28, 2024
1 parent 91932f8 commit b5c828b
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 10 deletions.
3 changes: 2 additions & 1 deletion checkov/terraform/checks/provider/base_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from checkov.common.checks.base_check import BaseCheck
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.provider.registry import provider_registry
from checkov.terraform.checks.provider.registry import provider_registry, plan_provider_registry


class BaseProviderCheck(BaseCheck):
Expand All @@ -26,6 +26,7 @@ def __init__(
)
self.supported_provider = supported_provider
provider_registry.register(self)
plan_provider_registry.register(self)

def scan_entity_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult:
return self.scan_provider_conf(conf)
Expand Down
2 changes: 2 additions & 0 deletions checkov/terraform/checks/provider/registry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from checkov.common.bridgecrew.check_type import CheckType
from checkov.terraform.checks.provider.base_registry import Registry
from checkov.terraform.checks.provider.plan_registry import Registry as PlanRegistry

provider_registry = Registry(CheckType.TERRAFORM)
plan_provider_registry = PlanRegistry(CheckType.TERRAFORM_PLAN)
12 changes: 10 additions & 2 deletions checkov/terraform/plan_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.parsers.node import ListNode
from checkov.common.util.consts import LINE_FIELD_NAMES, TRUE_AFTER_UNKNOWN
from checkov.common.util.consts import LINE_FIELD_NAMES, TRUE_AFTER_UNKNOWN, START_LINE, END_LINE
from checkov.common.util.type_forcers import force_list
from checkov.terraform.context_parsers.tf_plan import parse

Expand Down Expand Up @@ -318,12 +318,20 @@ def _get_provider(template: dict[str, dict[str, Any]]) -> dict[str, dict[str, An
# Not a provider, skip
continue
provider_map[provider_key] = {}
provider_alias = provider_data.get("alias", "default")
provider_map[provider_key][provider_alias] = {}
provider_map_entry = provider_map[provider_key][provider_alias]
for field, value in provider_data.get('expressions', {}).items():
if field in LINE_FIELD_NAMES or not isinstance(value, dict):
continue # don't care about line #s or non dicts
expression_value = value.get('constant_value', None)
if expression_value:
provider_map[provider_key][field] = expression_value
if isinstance(expression_value, str):
expression_value = [expression_value]
provider_map_entry[field] = expression_value
provider_map_entry['start_line'] = [provider_data.get(START_LINE, 1) - 1]
provider_map_entry['end_line'] = [provider_data.get(END_LINE, 1)]
provider_map_entry[TF_PLAN_RESOURCE_ADDRESS] = f"{provider_key}.{provider_alias}"

return provider_map

Expand Down
13 changes: 7 additions & 6 deletions checkov/terraform/plan_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
from checkov.runner_filter import RunnerFilter
from checkov.terraform.base_runner import BaseTerraformRunner
from checkov.terraform.checks.data.registry import data_registry
from checkov.terraform.checks.provider.registry import plan_provider_registry
from checkov.terraform.checks.resource.registry import resource_registry
from checkov.terraform.context_parsers.registry import parser_registry
from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_ADDRESS
from checkov.terraform.plan_utils import create_definitions, build_definitions_context
from checkov.terraform.deep_analysis_plan_graph_manager import DeepAnalysisGraphManager
from common.util import data_structures_utils

_TerraformPlanContext: TypeAlias = "dict[str, dict[str, Any]]"
_TerraformPlanDefinitions: TypeAlias = "dict[str, dict[str, Any]]"
Expand Down Expand Up @@ -95,6 +97,7 @@ def __init__(self, graph_class: Type[TerraformLocalGraph] = TerraformLocalGraph,
block_type_registries = { # noqa: CCE003 # a static attribute
'resource': resource_registry,
'data': data_registry,
'provider': plan_provider_registry
}

def run(
Expand Down Expand Up @@ -199,8 +202,8 @@ def check_tf_definition(
logging.debug(f"Scanning file: {scanned_file}")
for block_type in definition.keys():
if block_type in self.block_type_registries.keys():
self.run_block(definition[block_type], None, full_file_path, root_folder, report, scanned_file,
block_type, runner_filter)
self.run_block(definition[block_type], self.context, full_file_path, root_folder,
report, scanned_file, block_type, runner_filter)

@staticmethod
def _get_file_path(full_file_path: TFDefinitionKeyType, root_folder: str | pathlib.Path) -> tuple[str, str]:
Expand Down Expand Up @@ -237,8 +240,8 @@ def run_block(
entity_context = self.get_entity_context(definition_path, full_file_path, entity)
entity_lines_range = [entity_context.get('start_line', 1), entity_context.get('end_line', 1)]
entity_code_lines = entity_context.get('code_lines', [])
entity_address = entity_context['address']
_, _, entity_config = registry.extract_entity_details(entity)
entity_address = entity_context.get('address') or entity_context.get(CustomAttributes.TF_RESOURCE_ADDRESS)

self._assign_graph_to_registry(registry)
results = registry.scan(scanned_file, entity, [], runner_filter, report_type=CheckType.TERRAFORM_PLAN)
Expand Down Expand Up @@ -290,9 +293,7 @@ def get_entity_context_and_evaluations(self, entity: dict[str, Any]) -> dict[str
raw_context['definition_path'] = entity[CustomAttributes.BLOCK_NAME].split('.')
return raw_context

def get_entity_context(
self, definition_path: list[str], full_file_path: str, entity: dict[str, Any]
) -> dict[str, Any]:
def get_entity_context(self, definition_path: list[str], full_file_path: str, entity: dict[str, Any]) -> dict[str, Any]:
if not self.context:
return {}

Expand Down
2 changes: 1 addition & 1 deletion checkov/terraform/plan_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def build_definitions_context(
definitions_raw: Dict[str, List[Tuple[int, str]]]
) -> Dict[str, Dict[str, Any]]:
definitions_context: dict[str, dict[str, Any]] = defaultdict(dict)
supported_block_types = ("data", "resource")
supported_block_types = ("data", "resource", "provider")
for full_file_path, definition in definitions.items():
for block_type in supported_block_types:
entities = definition.get(block_type, [])
Expand Down

0 comments on commit b5c828b

Please sign in to comment.