Skip to content

Commit

Permalink
feat(terraform): Remove old tf parser (#5420)
Browse files Browse the repository at this point in the history
* Removed old parser usage and fixed all tests besides module_dependency

* Removed unneeded Module fields like module_dependency_map

* removed TFBlock fields like module_dependency and module_dependency_num

* Added TODO to make sure I don't forget to remove TFDefinitionKeyType

* Fixed tests with local related paths

* Fixed runner_registry usage of tf-definitions

* Fixed the last failing test to match the new parser

* Fixed all mypy errors but 1 in runner registry

* Mypy works, but tests not

* Middle of debugging tests

* Fixed remaining test_runner tests

* Fixed tests which specifically used str instead of TFDefinitionKey

* Deleted test which used old definitions

* Minimized usage of TFDefinitionKeyType

* flake8

* hopefully fix import linter

* fixed type is not subscriptable
  • Loading branch information
bo156 committed Aug 16, 2023
1 parent a8a9c5e commit f6751f2
Show file tree
Hide file tree
Showing 36 changed files with 508 additions and 2,033 deletions.
4 changes: 2 additions & 2 deletions checkov/common/graph/graph_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

if TYPE_CHECKING:
from checkov.common.graph.graph_builder.local_graph import LocalGraph # noqa
from checkov.terraform.parser import Parser
from checkov.terraform.tf_parser import TFParser
from checkov.common.typing import LibraryGraph, LibraryGraphConnector

_LocalGraph = TypeVar("_LocalGraph", bound="LocalGraph[Any]")
_Definitions = TypeVar("_Definitions")


class GraphManager(Generic[_LocalGraph, _Definitions]):
def __init__(self, db_connector: LibraryGraphConnector, parser: Parser | None, source: str = "") -> None:
def __init__(self, db_connector: LibraryGraphConnector, parser: TFParser | None, source: str = "") -> None:
self.db_connector = db_connector
self.source = source
self.parser = parser
Expand Down
14 changes: 7 additions & 7 deletions checkov/common/runners/runner_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@
from checkov.common.util import data_structures_utils
from checkov.common.util.banner import tool as tool_name
from checkov.common.util.json_utils import CustomJSONEncoder
from checkov.common.util.parser_utils import strip_terraform_module_referrer
from checkov.common.util.secrets_omitter import SecretsOmitter
from checkov.common.util.type_forcers import convert_csv_string_arg_to_list, force_list
from checkov.sca_image.runner import Runner as image_runner
from checkov.common.secrets.consts import SECRET_VALIDATION_STATUSES
from checkov.terraform.context_parsers.registry import parser_registry
from checkov.terraform.parser import Parser
from checkov.terraform.tf_parser import TFParser

if TYPE_CHECKING:
from checkov.common.output.baseline import Baseline
Expand Down Expand Up @@ -655,24 +654,25 @@ def enrich_report_with_guidelines(scan_report: Report) -> None:
def get_enriched_resources(
repo_roots: list[str | Path], download_external_modules: bool
) -> dict[str, dict[str, Any]]:
from checkov.terraform.modules.module_objects import TFDefinitionKey

repo_definitions = {}
for repo_root in repo_roots:
tf_definitions: dict[str, Any] = {}
parsing_errors: dict[str, Exception] = {}
repo_root = os.path.abspath(repo_root)
Parser().parse_directory(
tf_definitions: dict[TFDefinitionKey, dict[str, list[dict[str, Any]]]] = TFParser().parse_directory(
directory=repo_root, # assume plan file is in the repo-root
out_definitions=tf_definitions,
out_parsing_errors=parsing_errors,
download_external_modules=download_external_modules,
)
repo_definitions[repo_root] = {'tf_definitions': tf_definitions, 'parsing_errors': parsing_errors}

enriched_resources = {}
for repo_root, parse_results in repo_definitions.items():
for full_file_path, definition in parse_results['tf_definitions'].items():
definitions = cast("dict[TFDefinitionKey, dict[str, list[dict[str, Any]]]]", parse_results['tf_definitions'])
for full_file_path, definition in definitions.items():
definitions_context = parser_registry.enrich_definitions_context((full_file_path, definition))
abs_scanned_file, _ = strip_terraform_module_referrer(file_path=full_file_path)
abs_scanned_file = full_file_path.file_path
scanned_file = os.path.relpath(abs_scanned_file, repo_root)
for block_type, block_value in definition.items():
if block_type in CHECK_BLOCK_TYPES:
Expand Down
1 change: 1 addition & 0 deletions checkov/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ResourceAttributesToOmit: TypeAlias = Dict[_Resource, _Attributes]
LibraryGraph: TypeAlias = "Union[DiGraph, Graph]"
LibraryGraphConnector: TypeAlias = "Union[DBConnector[DiGraph], DBConnector[Graph]]"
# TODO Remove this type and only use TFDefinitionKey
TFDefinitionKeyType: TypeAlias = "Union[str, TFDefinitionKey]"


Expand Down
1 change: 0 additions & 1 deletion checkov/common/util/env_vars_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def __init__(self) -> None:
self.IGNORE_HIDDEN_DIRECTORIES = convert_str_to_bool(os.getenv("CKV_IGNORE_HIDDEN_DIRECTORIES", True))
self.MAX_FILE_SIZE = force_int(os.getenv("CHECKOV_MAX_FILE_SIZE", 5_000_000)) # 5 MB is default limit
self.MAX_IAC_FILE_SIZE = force_int(os.getenv("CHECKOV_MAX_IAC_FILE_SIZE", 50_000_000)) # 50 MB is default limit
self.NEW_TF_PARSER = convert_str_to_bool(os.getenv("CHECKOV_NEW_TF_PARSER", True))
self.NO_OUTPUT = convert_str_to_bool(os.getenv("CHECKOV_NO_OUTPUT", False))
self.OPENAI_MAX_FINDINGS = force_int(os.getenv("CKV_OPENAI_MAX_FINDINGS", 5))
self.OPENAI_MAX_TOKENS = force_int(os.getenv("CKV_OPENAI_MAX_TOKENS", 512))
Expand Down
106 changes: 1 addition & 105 deletions checkov/common/util/parser_utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from __future__ import annotations

import json
import os
import re
from dataclasses import dataclass
from enum import Enum
from typing import Any, List, Optional, Tuple
from typing import Any, List

import hcl2

from checkov.common.runners.base_runner import strtobool
from checkov.common.typing import TFDefinitionKeyType

_FUNCTION_NAME_CHARS = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")

_ARG_VAR_PATTERN = re.compile(r"[a-zA-Z_]+(\.[a-zA-Z_]+)+")
Expand Down Expand Up @@ -340,103 +336,3 @@ def to_string(value: Any) -> str:
elif value is False:
return "false"
return str(value)


def get_current_module_index(full_path: str) -> Optional[int]:
hcl_index = None
tf_index = None
if TERRAFORM_NESTED_MODULE_PATH_PREFIX not in full_path and TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR not in full_path:
return len(full_path)
if f'.hcl{TERRAFORM_NESTED_MODULE_PATH_PREFIX}' in full_path:
hcl_index = full_path.index(f'.hcl{TERRAFORM_NESTED_MODULE_PATH_PREFIX}') + 4 # len('.hcl')
elif f'.hcl{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}' in full_path:
hcl_index = full_path.index(f'.hcl{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}') + 4 # len('.hcl')
if f'.tf{TERRAFORM_NESTED_MODULE_PATH_PREFIX}' in full_path:
tf_index = full_path.index(f'.tf{TERRAFORM_NESTED_MODULE_PATH_PREFIX}') + 3 # len('.tf')
elif f'.tf{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}' in full_path:
tf_index = full_path.index(f'.tf{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}') + 3 # len('.tf')
if hcl_index and tf_index:
# returning the index of the first file
return min(hcl_index, tf_index)
if hcl_index:
return hcl_index
return tf_index


def is_nested(full_path: TFDefinitionKeyType | None) -> bool:
from checkov.terraform.modules.module_objects import TFDefinitionKey
if isinstance(full_path, str):
return TERRAFORM_NESTED_MODULE_PATH_PREFIX in full_path
if isinstance(full_path, TFDefinitionKey):
return full_path.tf_source_modules is not None
return False


def get_tf_definition_key(nested_module: str, module_name: str, module_index: Any, nested_key: str = '') -> str:
return f"{nested_module}{TERRAFORM_NESTED_MODULE_PATH_PREFIX}{module_name}{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}{module_index}{nested_key}{TERRAFORM_NESTED_MODULE_PATH_ENDING}"


def get_tf_definition_key_from_module_dependency(
path: str, module_dependency: str | None, module_dependency_num: str | None
) -> str:
if not module_dependency:
return path
if not is_nested(module_dependency):
return f"{path}{TERRAFORM_NESTED_MODULE_PATH_PREFIX}{module_dependency}{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}{module_dependency_num}{TERRAFORM_NESTED_MODULE_PATH_ENDING}"
module_index = get_current_module_index(module_dependency)
return f"{path}{TERRAFORM_NESTED_MODULE_PATH_PREFIX}{module_dependency[:module_index]}{TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR}{module_dependency_num}{module_dependency[module_index:]}{TERRAFORM_NESTED_MODULE_PATH_ENDING}"


def get_module_from_full_path(file_path: TFDefinitionKeyType | None) -> Tuple[TFDefinitionKeyType | None, str | None]:
from checkov.terraform.modules.module_objects import TFDefinitionKey
if not file_path or not is_nested(file_path):
return None, None
if isinstance(file_path, TFDefinitionKey):
if file_path.tf_source_modules is None:
return None, None
if strtobool(os.getenv('ENABLE_DEFINITION_KEY', 'False')):
return TFDefinitionKey(file_path=file_path.tf_source_modules.path, tf_source_modules=file_path.tf_source_modules.nested_tf_module), None
return file_path.tf_source_modules.path, None
tmp_path = file_path[file_path.index(TERRAFORM_NESTED_MODULE_PATH_PREFIX) + TERRAFORM_NESTED_MODULE_PATH_SEPARATOR_LENGTH: -TERRAFORM_NESTED_MODULE_PATH_SEPARATOR_LENGTH]
if is_nested(tmp_path):
module = get_abs_path(tmp_path) + tmp_path[tmp_path.index(TERRAFORM_NESTED_MODULE_PATH_PREFIX):]
index = tmp_path[tmp_path.index(TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR) + TERRAFORM_NESTED_MODULE_PATH_SEPARATOR_LENGTH:tmp_path.index(TERRAFORM_NESTED_MODULE_PATH_PREFIX)]
else:
module = get_abs_path(tmp_path)
index = tmp_path[tmp_path.index(TERRAFORM_NESTED_MODULE_INDEX_SEPARATOR) + TERRAFORM_NESTED_MODULE_PATH_SEPARATOR_LENGTH:]
return module, index


def get_module_name(file_path: TFDefinitionKeyType) -> str | None:
from checkov.terraform.modules.module_objects import TFDefinitionKey
if isinstance(file_path, TFDefinitionKey):
if not file_path.tf_source_modules:
return None
module_name = file_path.tf_source_modules.name
if file_path.tf_source_modules.foreach_idx:
foreach_or_count = '"' if isinstance(file_path.tf_source_modules.foreach_idx, str) else ''
module_name = f'{module_name}[{foreach_or_count}{file_path.tf_source_modules.foreach_idx}{foreach_or_count}]'
return module_name
return None


def get_abs_path(file_path: TFDefinitionKeyType) -> str:
return file_path[:get_current_module_index(file_path)] if isinstance(file_path, str) else str(file_path.file_path)


def strip_terraform_module_referrer(file_path: str) -> tuple[str, str | None]:
"""
For file paths containing module referrer information (e.g.: "module/module.tf[main.tf#0]"), this
returns a tuple containing the file path (e.g., "module/module.tf") and referrer (e.g., "main.tf#0").
If the file path does not contain a referred, the tuple will contain the original file path and None.
"""
if file_path.endswith(TERRAFORM_NESTED_MODULE_PATH_ENDING) and TERRAFORM_NESTED_MODULE_PATH_PREFIX in file_path:
return (
file_path[: file_path.index(TERRAFORM_NESTED_MODULE_PATH_PREFIX)],
file_path[
file_path.index(TERRAFORM_NESTED_MODULE_PATH_PREFIX)
+ TERRAFORM_NESTED_MODULE_PATH_SEPARATOR_LENGTH : -TERRAFORM_NESTED_MODULE_PATH_SEPARATOR_LENGTH
],
)
else:
return file_path, None
5 changes: 2 additions & 3 deletions checkov/terraform/context_parsers/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
from checkov.common.comment.enum import COMMENT_REGEX
from checkov.common.models.enums import ContextCategories
from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger
from checkov.common.typing import TFDefinitionKeyType
from checkov.common.util.parser_utils import get_abs_path
from checkov.terraform import TFDefinitionKey, get_abs_path
from checkov.terraform.context_parsers.registry import parser_registry

OPEN_CURLY = "{"
Expand Down Expand Up @@ -155,7 +154,7 @@ def _compute_definition_end_line(self, start_line_num: int) -> int:
return end_line_num

def run(
self, tf_file: TFDefinitionKeyType, definition_blocks: List[Dict[str, Any]], collect_skip_comments: bool = True
self, tf_file: TFDefinitionKey, definition_blocks: List[Dict[str, Any]], collect_skip_comments: bool = True
) -> Dict[str, Any]:
# TF files for loaded modules have this formation: <file>[<referrer>#<index>]
# Chop off everything after the file name for our purposes here
Expand Down
20 changes: 6 additions & 14 deletions checkov/terraform/context_parsers/registry.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from __future__ import annotations

import logging
import os
from typing import Dict, TYPE_CHECKING, Tuple, List, Any

import dpath

from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger
from checkov.common.runners.base_runner import strtobool
from checkov.common.typing import TFDefinitionKeyType
from checkov.terraform.modules.module_objects import TFDefinitionKey

if TYPE_CHECKING:
Expand All @@ -17,7 +14,7 @@

class ParserRegistry:
context_parsers: Dict[str, "BaseContextParser"] = {} # noqa: CCE003
definitions_context: Dict[TFDefinitionKeyType, Dict[str, Dict[str, Any]]] = {} # noqa: CCE003
definitions_context: Dict[TFDefinitionKey, Dict[str, Dict[str, Any]]] = {} # noqa: CCE003

def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
Expand All @@ -30,25 +27,20 @@ def reset_definitions_context(self) -> None:
self.definitions_context = {}

def enrich_definitions_context(
self, definitions: Tuple[str, Dict[str, List[Dict[str, Any]]]], collect_skip_comments: bool = True
) -> Dict[TFDefinitionKeyType, Dict[str, Dict[str, Any]]]:
self, definitions: Tuple[TFDefinitionKey, Dict[str, List[Dict[str, Any]]]], collect_skip_comments: bool = True
) -> Dict[TFDefinitionKey, Dict[str, Dict[str, Any]]]:
supported_definitions = [parser_type for parser_type in self.context_parsers.keys()]
(tf_definition_key, definition_blocks_types) = definitions
enable_definition_key = strtobool(os.getenv('ENABLE_DEFINITION_KEY', 'False'))
if isinstance(tf_definition_key, TFDefinitionKey):
tf_file: TFDefinitionKeyType = tf_definition_key.file_path if not enable_definition_key else tf_definition_key
else:
tf_file = tf_definition_key if not enable_definition_key \
else TFDefinitionKey(file_path=tf_definition_key, tf_source_modules=None)

if definition_blocks_types:
definition_blocks_types = {x: definition_blocks_types[x] for x in definition_blocks_types.keys()}
for definition_type in definition_blocks_types.keys():
if definition_type in supported_definitions:
dpath.new(self.definitions_context, [tf_file, definition_type], {})
dpath.new(self.definitions_context, [tf_definition_key, definition_type], {})
context_parser = self.context_parsers[definition_type]
definition_blocks = definition_blocks_types[definition_type]
self.definitions_context[tf_file][definition_type] = context_parser.run(tf_file, definition_blocks, collect_skip_comments)
self.definitions_context[tf_definition_key][definition_type] = \
context_parser.run(tf_definition_key, definition_blocks, collect_skip_comments)
return self.definitions_context


Expand Down
17 changes: 3 additions & 14 deletions checkov/terraform/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from __future__ import annotations

import os
from typing import Union, Dict, Any, List, Optional, Set, TYPE_CHECKING, cast
import dpath
import re

from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.graph.graph_builder.utils import calculate_hash
from checkov.common.runners.base_runner import strtobool
from checkov.common.typing import TFDefinitionKeyType
from checkov.terraform.graph_builder.utils import INTERPOLATION_EXPR
from checkov.common.graph.graph_builder.graph_components.blocks import Block
Expand All @@ -21,8 +19,6 @@
class TerraformBlock(Block):
__slots__ = (
"module_connections",
"module_dependency",
"module_dependency_num",
"source_module",
"has_dynamic_block",
"dynamic_attributes",
Expand Down Expand Up @@ -62,8 +58,6 @@ def __init__(
has_dynamic_block=has_dynamic_block,
dynamic_attributes=dynamic_attributes,
)
self.module_dependency: TFDefinitionKeyType | None = ""
self.module_dependency_num: str | None = ""
if path:
self.path = path # type:ignore[assignment] # Block class would need to be a Generic type to make this pass
if attributes.get(RESOLVED_MODULE_ENTRY_NAME):
Expand All @@ -72,10 +66,9 @@ def __init__(
self.module_connections: Dict[str, List[int]] = {}
self.source_module: Set[int] = set()
self.has_dynamic_block = has_dynamic_block
if strtobool(os.getenv('CHECKOV_NEW_TF_PARSER', 'True')):
self.source_module_object: Optional[TFModule] = None
self.for_each_index: Optional[Any] = None
self.foreach_attrs: list[str] | None = None
self.source_module_object: Optional[TFModule] = None
self.for_each_index: Optional[Any] = None
self.foreach_attrs: list[str] | None = None

def __eq__(self, other: object) -> bool:
if not isinstance(other, TerraformBlock):
Expand Down Expand Up @@ -256,8 +249,6 @@ def to_dict(self) -> dict[str, Any]:
'config': self.config,
'id': self.id,
'module_connections': self.module_connections,
'module_dependency': self.module_dependency,
'module_dependency_num': self.module_dependency_num,
'name': self.name,
'path': self.path,
'source': self.source,
Expand All @@ -274,7 +265,5 @@ def from_dict(data: dict[str, Any]) -> TerraformBlock:

tf_block.breadcrumbs = data.get('breadcrumbs', {})
tf_block.module_connections = data.get('module_connections', {})
tf_block.module_dependency = data.get('module_dependency', '')
tf_block.source_module = data.get('source_module', set())
tf_block.module_dependency_num = data.get('module_dependency_num', '')
return tf_block
Loading

0 comments on commit f6751f2

Please sign in to comment.