diff --git a/checkov/common/goget/github/get_git.py b/checkov/common/goget/github/get_git.py index 329e99029d7..51ef9c604dd 100644 --- a/checkov/common/goget/github/get_git.py +++ b/checkov/common/goget/github/get_git.py @@ -14,7 +14,8 @@ git_import_error = e COMMIT_ID_PATTERN = re.compile(r"\?(ref=)(?P([0-9a-f]{40}))") -TAG_PATTERN = re.compile(r'\?(ref=)(?P(.*))') +TAG_PATTERN = re.compile(r'\?(ref=)(?P(.*))') # technically should be with ?ref=tags/ but this catches both +BRANCH_PATTERN = re.compile(r'\?(ref=heads/)(?P(.*))') class GitGetter(BaseGetter): @@ -23,6 +24,7 @@ def __init__(self, url: str, create_clone_and_result_dirs: bool = True) -> None: self.create_clone_and_res_dirs = create_clone_and_result_dirs self.tag = '' self.commit_id: str | None = None + self.branch = '' if "?ref" in url: url = self.extract_git_ref(url=url) @@ -30,6 +32,14 @@ def __init__(self, url: str, create_clone_and_result_dirs: bool = True) -> None: super().__init__(url) def extract_git_ref(self, url: str) -> str: + search_branch = re.search(BRANCH_PATTERN, url) + if search_branch: + self.branch = search_branch.group("branch") + # remove heads/ from ref= to get actual branch name + # self.branch = re.sub('heads.*/', '', url) + url = re.sub(BRANCH_PATTERN, '', url) + return url + search_commit_id = re.search(COMMIT_ID_PATTERN, url) if search_commit_id: self.commit_id = search_commit_id.group("commit_id") @@ -71,7 +81,9 @@ def do_get(self) -> str: def _clone(self, git_url: str, clone_dir: str) -> None: self.logger.debug(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}") with temp_environ(GIT_TERMINAL_PROMPT="0"): # disables user prompts originating from GIT - if self.commit_id: + if self.branch: + Repo.clone_from(git_url, clone_dir, branch=self.branch, depth=1) # depth=1 for shallow clone + elif self.commit_id: # no commit id support for branch repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone repo.git.checkout(self.commit_id) elif self.tag: diff --git a/checkov/common/util/ext_argument_parser.py b/checkov/common/util/ext_argument_parser.py index 0701dd270db..f36d4641e75 100644 --- a/checkov/common/util/ext_argument_parser.py +++ b/checkov/common/util/ext_argument_parser.py @@ -156,8 +156,9 @@ def add_parser_args(self) -> None: self.add( "--external-checks-git", action="append", - help="Github url of external checks to be added. \n you can specify a subdirectory after a " - "double-slash //. \n cannot be used together with --external-checks-dir", + help="Github url of external checks to be added.\n you can specify a subdirectory after a double-slash //." + "\n possible to use ?ref=tags/tagName or ?ref=heads/branchName or ?ref=commit_id" + "\n cannot be used together with --external-checks-dir", ) self.add( "-l", diff --git a/checkov/kubernetes/image_referencer/provider/k8s.py b/checkov/kubernetes/image_referencer/provider/k8s.py index ab9bd251cf0..364d7a69762 100644 --- a/checkov/kubernetes/image_referencer/provider/k8s.py +++ b/checkov/kubernetes/image_referencer/provider/k8s.py @@ -19,77 +19,55 @@ def __init__(self, graph_connector: DiGraph) -> None: def extract_images_from_cron_job(resource: dict[str, Any]) -> list[str]: - image_names: list[str] = [] - spec = find_in_dict(input_dict=resource, key_path="spec/jobTemplate/spec/template/spec") - if isinstance(spec, dict): - containers = spec.get("containers") - image_names.extend(extract_images_from_containers(containers=containers)) - - containers = spec.get("initContainers") - image_names.extend(extract_images_from_containers(containers=containers)) - - return image_names + return _extract_images_from_spec(spec) def extract_images_from_pod(resource: dict[str, Any]) -> list[str]: - image_names: list[str] = [] - spec = resource.get("spec") - if isinstance(spec, dict): - containers = spec.get("containers") - image_names.extend(extract_images_from_containers(containers=containers)) - - containers = spec.get("initContainers") - image_names.extend(extract_images_from_containers(containers=containers)) - - return image_names + return _extract_images_from_spec(spec) def extract_images_from_pod_template(resource: dict[str, Any]) -> list[str]: # the 'PodTemplate' object is usually not defined by the user, but rather used by Kubernetes internally - image_names: list[str] = [] - spec = find_in_dict(input_dict=resource, key_path="template/spec") - if isinstance(spec, dict): - containers = spec.get("containers") - image_names.extend(extract_images_from_containers(containers=containers)) - - containers = spec.get("initContainers") - image_names.extend(extract_images_from_containers(containers=containers)) - - return image_names + return _extract_images_from_spec(spec) def extract_images_from_template(resource: dict[str, Any]) -> list[str]: - image_names: list[str] = [] - spec = find_in_dict(input_dict=resource, key_path="spec/template/spec") - if isinstance(spec, dict): - containers = spec.get("containers") - image_names.extend(extract_images_from_containers(containers=containers)) - - containers = spec.get("initContainers") - image_names.extend(extract_images_from_containers(containers=containers)) - - return image_names + return _extract_images_from_spec(spec) -def extract_images_from_containers(containers: Any) -> list[str]: +def extract_images_from_containers(containers: Any) -> set[str]: """Helper function to extract image names from containers block""" - image_names: list[str] = [] + image_names: set[str] = set() if isinstance(containers, list): for container in containers: if isinstance(container, dict): image = container.get("image") if image and isinstance(image, str): - image_names.append(image) + image_names.add(image) return image_names +def _extract_images_from_spec(spec: dict[str, Any] | None) -> list[str]: + image_names: set[str] = set() + + if isinstance(spec, dict): + containers = spec.get("containers") + image_names.update(extract_images_from_containers(containers=containers)) + + containers = spec.get("initContainers") + image_names.update(extract_images_from_containers(containers=containers)) + + # Makes sure we return no duplications + return list(image_names) + + # needs to be at the bottom to add the defined functions SUPPORTED_K8S_IMAGE_RESOURCE_TYPES: "dict[str, _ExtractImagesCallableAlias]" = { "CronJob": extract_images_from_cron_job, diff --git a/checkov/main.py b/checkov/main.py index 516625a691e..2be2576d142 100755 --- a/checkov/main.py +++ b/checkov/main.py @@ -644,7 +644,7 @@ def commit_repository(self) -> str | None: def get_external_checks_dir(self) -> list[str]: external_checks_dir: "list[str]" = self.config.external_checks_dir if self.config.external_checks_git: - git_getter = GitGetter(self.config.external_checks_git[0]) + git_getter = GitGetter(url=self.config.external_checks_git[0]) external_checks_dir = [git_getter.get()] atexit.register(shutil.rmtree, str(Path(external_checks_dir[0]).parent)) return external_checks_dir diff --git a/checkov/terraform/checks/resource/aws/MQBrokerVersion.py b/checkov/terraform/checks/resource/aws/MQBrokerVersion.py index a48a7373d28..808a4529a22 100644 --- a/checkov/terraform/checks/resource/aws/MQBrokerVersion.py +++ b/checkov/terraform/checks/resource/aws/MQBrokerVersion.py @@ -3,8 +3,10 @@ from checkov.common.models.enums import CheckCategories, CheckResult from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck -minimumActiveMQ = 5.16 -minimumRabbitMQ = 3.8 +ENGINE_VERSION_PATTERN = re.compile(r"(\d+\.\d+.\d+)") +ENGINE_VERSION_SHORT_PATTERN = re.compile(r"(\d+\.\d+)") +MINIMUM_ACTIVEMQ_VERSION = 5.16 +MINIMUM_RABBITMQ_VERSION = 3.8 class MQBrokerVersion(BaseResourceCheck): @@ -19,15 +21,15 @@ def scan_resource_conf(self, conf) -> CheckResult: if conf.get("engine_type"): mq_type = conf.get("engine_type")[0] semantic = conf.get("engine_version", [''])[0] - if not re.search(r'(\d+\.\d+.\d+)', semantic): + if not re.search(ENGINE_VERSION_PATTERN, semantic): return CheckResult.UNKNOWN - version = float(re.search(r'(\d+\.\d+)', semantic).group()) + version = float(re.search(ENGINE_VERSION_SHORT_PATTERN, semantic).group()) if mq_type in 'ActiveMQ': - if version >= minimumActiveMQ: + if version >= MINIMUM_ACTIVEMQ_VERSION: return CheckResult.PASSED if mq_type in 'RabbitMQ': - if version >= minimumRabbitMQ: + if version >= MINIMUM_RABBITMQ_VERSION: return CheckResult.PASSED return CheckResult.FAILED diff --git a/checkov/terraform/checks/resource/base_resource_value_check.py b/checkov/terraform/checks/resource/base_resource_value_check.py index 3375292dde8..07cfb80f25f 100644 --- a/checkov/terraform/checks/resource/base_resource_value_check.py +++ b/checkov/terraform/checks/resource/base_resource_value_check.py @@ -5,6 +5,7 @@ import dpath import re + from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck from checkov.common.models.enums import CheckResult, CheckCategories from checkov.common.models.consts import ANY_VALUE @@ -12,6 +13,8 @@ from checkov.terraform.graph_builder.utils import get_referenced_vertices_in_value from checkov.terraform.parser_functions import handle_dynamic_values +ARRAY_INDEX_PATTERN = re.compile(r"^\[?\d+]?$") + class BaseResourceValueCheck(BaseResourceCheck): def __init__( @@ -32,7 +35,7 @@ def _filter_key_path(path: str) -> List[str]: :param path: valid JSONPath of an attribute :return: List of named attributes with respect to the input JSONPath order """ - return [x for x in path.split("/") if not re.search(re.compile(r"^\[?\d+]?$"), x)] + return [x for x in path.split("/") if not re.search(ARRAY_INDEX_PATTERN, x)] @staticmethod def _is_nesting_key(inspected_attributes: List[str], key: List[str]) -> bool: diff --git a/checkov/terraform/context_parsers/base_parser.py b/checkov/terraform/context_parsers/base_parser.py index 87a1b3db8c6..00dbc82be4c 100644 --- a/checkov/terraform/context_parsers/base_parser.py +++ b/checkov/terraform/context_parsers/base_parser.py @@ -83,8 +83,7 @@ def _read_file_lines(self) -> List[Tuple[int, str]]: @staticmethod def is_optional_comment_line(line: str) -> bool: - line_without_whitespace = line.replace(" ", "") - return "checkov:skip=" in line_without_whitespace or "bridgecrew:skip=" in line_without_whitespace + return "checkov:skip=" in line or "bridgecrew:skip=" in line def _collect_skip_comments(self, definition_blocks: List[Dict[str, Any]]) -> Dict[str, Any]: """ diff --git a/checkov/terraform/evaluation/base_variable_evaluation.py b/checkov/terraform/evaluation/base_variable_evaluation.py index 7b5baef1869..49b0dd2458a 100644 --- a/checkov/terraform/evaluation/base_variable_evaluation.py +++ b/checkov/terraform/evaluation/base_variable_evaluation.py @@ -31,11 +31,6 @@ def evaluate_variables(self) -> Any: """ raise NotImplementedError() - @staticmethod - def _is_variable_only_expression(assignment_regex: str, entry_expression: str) -> bool: - exact_assignment_regex = re.compile(r"^" + assignment_regex + r"$") - return len(re.findall(exact_assignment_regex, entry_expression)) > 0 - @staticmethod def extract_context_path(definition_path: str) -> Tuple[str, str]: """ diff --git a/checkov/terraform/graph_builder/foreach/abstract_handler.py b/checkov/terraform/graph_builder/foreach/abstract_handler.py index e264682ded3..afaa4e02868 100644 --- a/checkov/terraform/graph_builder/foreach/abstract_handler.py +++ b/checkov/terraform/graph_builder/foreach/abstract_handler.py @@ -252,10 +252,13 @@ def extract_from_list(val: Any) -> Any: @staticmethod def need_to_add_quotes(code: str, key: str) -> bool: - patterns = [r'lower\(' + key + r'\)', r'upper\(' + key + r'\)'] - for pattern in patterns: - if re.search(pattern, code): - return True + if "lower" in code or "upper" in code: + patterns = (r'lower\(' + key + r'\)', r'upper\(' + key + r'\)') + for pattern in patterns: + if re.search(pattern, code): + return True + if f'[{key}]' in code: return True + return False diff --git a/checkov/terraform/graph_builder/graph_components/blocks.py b/checkov/terraform/graph_builder/graph_components/blocks.py index bcc39d1d1f1..b6f3bc800be 100644 --- a/checkov/terraform/graph_builder/graph_components/blocks.py +++ b/checkov/terraform/graph_builder/graph_components/blocks.py @@ -117,10 +117,11 @@ def _collect_dynamic_dependent_keys(self, dynamic_block_name: str, value: str | dynamic_content_path: List[str], dynamic_changed_attributes: List[str]) -> None: if isinstance(value, str): dynamic_ref = f'{dynamic_block_name}.value' - interpolation_matches = re.findall(INTERPOLATION_EXPR, value) - for match in interpolation_matches: - if dynamic_ref in match: - dynamic_changed_attributes.append(key_path) + if "${" in value: + interpolation_matches = re.findall(INTERPOLATION_EXPR, value) + for match in interpolation_matches: + if dynamic_ref in match: + dynamic_changed_attributes.append(key_path) elif isinstance(value, list): for idx, sub_value in enumerate(value): self._collect_dynamic_dependent_keys( diff --git a/checkov/terraform/graph_builder/utils.py b/checkov/terraform/graph_builder/utils.py index 9579f75c926..e0b576a5951 100644 --- a/checkov/terraform/graph_builder/utils.py +++ b/checkov/terraform/graph_builder/utils.py @@ -4,7 +4,7 @@ import os import re from typing import Tuple -from typing import Union, List, Any, Dict, Optional, Callable, TYPE_CHECKING +from typing import Union, List, Any, Dict, Optional, TYPE_CHECKING import igraph @@ -61,66 +61,92 @@ def extract_module_dependency_path(module_dependency: str | List[str]) -> List[s ] -BLOCK_TYPES_STRINGS = ["var", "local", "module", "data"] +BLOCK_TYPES_STRINGS = ("var", "local", "module", "data") FUNC_CALL_PREFIX_PATTERN = re.compile(r"([.a-zA-Z]+)\(") -INTERPOLATION_PATTERN = re.compile(r"[${}]") INTERPOLATION_EXPR = re.compile(r"\$\{([^\}]*)\}") INDEX_PATTERN = re.compile(r"\[([0-9]+)\]") MAP_ATTRIBUTE_PATTERN = re.compile(r"\[\"([^\d\W]\w*)\"\]") +NESTED_ATTRIBUTE_PATTERN = re.compile(r"\.\d+") def get_vertices_references( str_value: str, aliases: Dict[str, Dict[str, str]], resources_types: List[str] ) -> List[TerraformVertexReference]: - vertices_references = [] + has_interpolation = True if "${" in str_value else False + vertices_references: "list[TerraformVertexReference]" = [] words_in_str_value = str_value.split() for word in words_in_str_value: - if word.startswith(".") or word.startswith(r"/."): + if word.startswith((".", r"/.")): # check if word is a relative path continue - interpolations = re.split(INTERPOLATION_EXPR, word) - for interpolation_content in interpolations: - for w in interpolation_content.split(","): - word_sub_parts = w.split(".") - if len(word_sub_parts) <= 1 or word_sub_parts[0].isnumeric(): - # if the word doesn't contain a '.' char, or if the first part before the dot is a number - continue - - suspected_block_type = word_sub_parts[0] - if suspected_block_type in BLOCK_TYPES_STRINGS: - # matching cases like 'var.x' - vertex_reference = TerraformVertexReference( - block_type=suspected_block_type, sub_parts=word_sub_parts[1:], origin_value=w - ) - if vertex_reference not in vertices_references: - vertices_references.append(vertex_reference) - continue - - vertex_reference_alias = get_vertex_reference_from_alias(suspected_block_type, aliases, word_sub_parts) - if vertex_reference_alias and vertex_reference_alias not in vertices_references: - vertex_reference_alias.origin_value = w - # matching cases where the word is referring an alias - vertices_references.append(vertex_reference_alias) - continue - - # matching cases like 'aws_vpc.main' - if word_sub_parts[0] in resources_types: - block_name = word_sub_parts[0] + "." + word_sub_parts[1] - word_sub_parts = [block_name] + word_sub_parts[2:] - vertex_reference = TerraformVertexReference( - block_type=BlockType.RESOURCE, sub_parts=word_sub_parts, origin_value=w - ) - if vertex_reference not in vertices_references: - vertices_references.append(vertex_reference) + if has_interpolation: + interpolations = re.split(INTERPOLATION_EXPR, word) + for interpolation_content in interpolations: + add_vertices_references_from_word( + vertices_references=vertices_references, + word=interpolation_content, + aliases=aliases, + resources_types=resources_types, + ) + else: + add_vertices_references_from_word( + vertices_references=vertices_references, + word=word, + aliases=aliases, + resources_types=resources_types, + ) return vertices_references +def add_vertices_references_from_word( + vertices_references: list[TerraformVertexReference], + word: str, + aliases: dict[str, dict[str, str]], + resources_types: list[str], +) -> None: + for w in word.split(","): + word_sub_parts = w.split(".") + if len(word_sub_parts) <= 1 or word_sub_parts[0].isnumeric(): + # if the word doesn't contain a '.' char, or if the first part before the dot is a number + continue + + suspected_block_type = word_sub_parts[0] + if suspected_block_type in BLOCK_TYPES_STRINGS: + # matching cases like 'var.x' + vertex_reference = TerraformVertexReference( + block_type=suspected_block_type, sub_parts=word_sub_parts[1:], origin_value=w + ) + if vertex_reference not in vertices_references: + vertices_references.append(vertex_reference) + continue + + vertex_reference_alias = get_vertex_reference_from_alias(suspected_block_type, aliases, word_sub_parts) + if vertex_reference_alias and vertex_reference_alias not in vertices_references: + vertex_reference_alias.origin_value = w + # matching cases where the word is referring an alias + vertices_references.append(vertex_reference_alias) + continue + + # matching cases like 'aws_vpc.main' + if word_sub_parts[0] in resources_types: + block_name = word_sub_parts[0] + "." + word_sub_parts[1] + word_sub_parts = [block_name] + word_sub_parts[2:] + vertex_reference = TerraformVertexReference( + block_type=BlockType.RESOURCE, sub_parts=word_sub_parts, origin_value=w + ) + if vertex_reference not in vertices_references: + vertices_references.append(vertex_reference) + + def get_vertex_reference_from_alias( block_type_str: str, aliases: Dict[str, Dict[str, str]], val: List[str] ) -> Optional[TerraformVertexReference]: + if not aliases: + return None + block_type = "" if block_type_str in aliases: block_type = aliases[block_type_str][CustomAttributes.BLOCK_TYPE] @@ -133,24 +159,40 @@ def get_vertex_reference_from_alias( def remove_function_calls_from_str(str_value: str) -> str: + if "(" not in str_value: + # otherwise it can't be a function call + return str_value + # remove start of function calls:: 'length(aws_vpc.main) > 0 ? aws_vpc.main[0].cidr_block : ${var.x}' --> 'aws_vpc.main) > 0 ? aws_vpc.main[0].cidr_block : ${var.x}' str_value = re.sub(FUNC_CALL_PREFIX_PATTERN, "", str_value) # remove ')' - return re.sub(re.compile(r"[)]+"), "", str_value) + return str_value.replace(")", "") def remove_index_pattern_from_str(str_value: str) -> str: + if "[" not in str_value: + # otherwise it can't be accessed via index + return str_value + str_value = re.sub(INDEX_PATTERN, "", str_value) str_value = str_value.replace('["', CHECKOV_LOREM_IPSUM_VAL).replace("[", " [ ").replace(CHECKOV_LOREM_IPSUM_VAL, '["') str_value = str_value.replace('"]', CHECKOV_LOREM_IPSUM_VAL).replace("]", " ] ").replace(CHECKOV_LOREM_IPSUM_VAL, '"]') return str_value -def remove_interpolation(str_value: str, replace_str: str = " ") -> str: - return re.sub(INTERPOLATION_PATTERN, replace_str, str_value) +def remove_interpolation(str_value: str) -> str: + if "${" not in str_value: + # otherwise it can't be a string interpolation + return str_value + + return str_value.replace("${", " ").replace("}", " ") def replace_map_attribute_access_with_dot(str_value: str) -> str: + if "[\"" not in str_value: + # otherwise it can't be accessed via named index + return str_value + split_by_identifiers = re.split(MAP_ATTRIBUTE_PATTERN, str_value) new_split = [] for split_part in split_by_identifiers: @@ -163,19 +205,10 @@ def replace_map_attribute_access_with_dot(str_value: str) -> str: return ".".join(new_split) -DEFAULT_CLEANUP_FUNCTIONS: List[Callable[[str], str]] = [ - remove_function_calls_from_str, - remove_index_pattern_from_str, - replace_map_attribute_access_with_dot, - remove_interpolation, -] - - def get_referenced_vertices_in_value( value: Union[str, List[str], Dict[str, str]], aliases: Dict[str, Dict[str, str]], resources_types: List[str], - cleanup_functions: Optional[List[Callable[[str], str]]] = None, ) -> List[TerraformVertexReference]: references_vertices: "list[TerraformVertexReference]" = [] @@ -183,36 +216,53 @@ def get_referenced_vertices_in_value( # bool/int values can't have a references to other vertices return references_vertices - if cleanup_functions is None: - cleanup_functions = DEFAULT_CLEANUP_FUNCTIONS - if isinstance(value, list): for sub_value in value: references_vertices += get_referenced_vertices_in_value( - sub_value, aliases, resources_types, cleanup_functions + sub_value, aliases, resources_types ) if isinstance(value, dict): for sub_value in value.values(): references_vertices += get_referenced_vertices_in_value( - sub_value, aliases, resources_types, cleanup_functions + sub_value, aliases, resources_types ) if isinstance(value, str): - value_len = len(value) - if CHECKOV_RENDER_MAX_LEN and 0 < CHECKOV_RENDER_MAX_LEN < value_len: - logging.debug(f'Rendering was skipped for a {value_len}-character-long string. If you wish to have it ' - f'evaluated, please set the environment variable CHECKOV_RENDER_MAX_LEN ' - f'to {str(value_len + 1)} or to 0 to allow rendering of any length') - else: - if value_len < 5 or "." not in value: - # the shortest reference is 'var.a' and references are done via dot notation - return references_vertices + references_vertices = get_referenced_vertices_in_str_value( + str_value=value, + aliases=aliases, + resources_types=resources_types, + ) + + return references_vertices - if cleanup_functions: - for func in cleanup_functions: - value = func(value) - references_vertices = get_vertices_references(value, aliases, resources_types) + +def get_referenced_vertices_in_str_value( + str_value: str, + aliases: dict[str, dict[str, str]], + resources_types: list[str], +) -> list[TerraformVertexReference]: + references_vertices: "list[TerraformVertexReference]" = [] + + value_len = len(str_value) + if CHECKOV_RENDER_MAX_LEN and 0 < CHECKOV_RENDER_MAX_LEN < value_len: + logging.debug( + f'Rendering was skipped for a {value_len}-character-long string. If you wish to have it ' + f'evaluated, please set the environment variable CHECKOV_RENDER_MAX_LEN ' + f'to {str(value_len + 1)} or to 0 to allow rendering of any length' + ) + else: + if value_len < 5 or "." not in str_value: + # the shortest reference is 'var.a' and references are done via dot notation + return references_vertices + + str_value = remove_function_calls_from_str(str_value=str_value) + str_value = remove_index_pattern_from_str(str_value=str_value) + str_value = replace_map_attribute_access_with_dot(str_value=str_value) + str_value = remove_interpolation(str_value=str_value) + + references_vertices = get_vertices_references(str_value, aliases, resources_types) return references_vertices @@ -259,7 +309,7 @@ def attribute_has_nested_attributes(attribute_key: str, attributes: Dict[str, An prefixes_with_attribute_key = [] else: prefixes_with_attribute_key = [a for a in attributes if a.startswith(attribute_key) and a != attribute_key] - if not any(re.findall(re.compile(r"\.\d+"), a) for a in prefixes_with_attribute_key): + if not any(re.findall(NESTED_ATTRIBUTE_PATTERN, a) for a in prefixes_with_attribute_key): # if there aro no numeric parts in the key such as key1.0.key2 return isinstance(attributes[attribute_key], dict) return isinstance(attributes[attribute_key], list) or isinstance(attributes[attribute_key], dict) @@ -292,9 +342,7 @@ def get_related_resource_id(resource: dict[str, Any], file_path_to_referred_id: def get_file_path_to_referred_id_networkx(graph_object: DiGraph) -> dict[str, str]: file_path_to_module_id = {} - for node in graph_object.nodes.values(): - if node.get(CustomAttributes.BLOCK_TYPE) == BlockType.MODULE: - modules = node + modules = [node for node in graph_object.nodes.values() if node.get(CustomAttributes.BLOCK_TYPE) == BlockType.MODULE] for modules_data in modules: diff --git a/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py b/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py index 276becf4726..90ccc95f3f6 100644 --- a/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py +++ b/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py @@ -19,6 +19,8 @@ # exclude "']" one the right side of the compare via (?!']), this can happen with a base64 encoded string COMPARE_REGEX = re.compile(r"^(?P.+?)\s*(?P==|!=|>=|>|<=|<|&&|\|\|)\s*(?P(?!']).+)$") +COMPARE_OPERATORS = (" == ", " != ", " < ", " <= ", " > ", " >= ", " && ", " || ") + CHECKOV_RENDER_MAX_LEN = force_int(os.getenv("CHECKOV_RENDER_MAX_LEN", "10000")) @@ -181,9 +183,9 @@ def strip_double_quotes(input_str: str) -> str: def evaluate_conditional_expression(input_str: str) -> str: - variable_ref = re.match(re.compile(r"^\${(.*)}$"), input_str) - if variable_ref: - input_str = variable_ref.groups()[0] + if input_str.startswith("${") and input_str.endswith("}"): + # just remove the needed char length of the interpolation marks + input_str = input_str[2:-1] condition = find_conditional_expression_groups(input_str) while condition: @@ -213,6 +215,10 @@ def evaluate_compare(input_str: str) -> str | bool | int: :return: evaluation of the expression """ if isinstance(input_str, str) and "for" not in input_str: + if not any(operator in input_str for operator in COMPARE_OPERATORS): + # if an operator doesn't exist in the string, no need to proceed + return input_str + match = re.search(COMPARE_REGEX, input_str) if match: compare_parts = match.groupdict() @@ -391,6 +397,10 @@ def apply_binary_op(a: Optional[Union[str, int, bool]], b: Optional[Union[str, i def evaluate_directives(input_str: str) -> str: + if "%{" not in input_str: + # no need to proceed further + return input_str + if re.search(DIRECTIVE_EXPR, input_str) is None: return input_str diff --git a/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py b/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py index 2f6dc069dc9..6f366bf466f 100644 --- a/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py +++ b/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py @@ -8,6 +8,8 @@ from checkov.terraform.parser_functions import tonumber, FUNCTION_FAILED, create_map, tobool, tostring +TIME_DELTA_PATTERN = re.compile(r"(\d*\.*\d+)") + """ This file contains a custom implementation of the builtin `eval` function. `eval` is not a safe function, because it can execute *every* command, @@ -139,7 +141,7 @@ def timeadd(input_str: str, time_delta: str) -> str: adding = False time_delta = time_delta[1:] # Split out into each of the deltas - deltas = re.split(r'(\d*\.*\d+)', time_delta) + deltas = re.split(TIME_DELTA_PATTERN, time_delta) # Needed to strip the leading empty element deltas = list(filter(None, deltas)) while len(deltas) > 0: diff --git a/checkov/terraform/module_loading/loaders/git_loader.py b/checkov/terraform/module_loading/loaders/git_loader.py index 18bbf9ddc16..590fe196521 100644 --- a/checkov/terraform/module_loading/loaders/git_loader.py +++ b/checkov/terraform/module_loading/loaders/git_loader.py @@ -15,6 +15,7 @@ from checkov.terraform.module_loading.module_params import ModuleParams DEFAULT_MODULE_SOURCE_PREFIX = "git::https://" +GIT_USER_PATTERN = re.compile(r"^(.*?@).*") @dataclass(frozen=True) @@ -112,9 +113,11 @@ def _parse_module_source(self, module_params: ModuleParams) -> ModuleSource: else: raise Exception("invalid git url") - username = re.match(re.compile(r"^(.*?@).*"), root_module) - if username and username[1] != "git@": - root_module = root_module.replace(username[1], "") + username = None + if "@" in root_module: + username = re.match(GIT_USER_PATTERN, root_module) + if username and username[1] != "git@": + root_module = root_module.replace(username[1], "") if root_module.endswith(".git"): root_module = root_module[:-4] diff --git a/checkov/terraform/module_loading/loaders/local_path_loader.py b/checkov/terraform/module_loading/loaders/local_path_loader.py index c84fa11c4b4..392b02d3739 100644 --- a/checkov/terraform/module_loading/loaders/local_path_loader.py +++ b/checkov/terraform/module_loading/loaders/local_path_loader.py @@ -12,6 +12,8 @@ if TYPE_CHECKING: from checkov.terraform.module_loading.module_params import ModuleParams +WINDOWS_MODULE_SOURCE_PATH_PATTERN = re.compile("[a-zA-Z]:\\\\") + class LocalPathLoader(ModuleLoader): def __init__(self) -> None: @@ -22,17 +24,12 @@ def discover(self, module_params: ModuleParams) -> None: pass def _is_matching_loader(self, module_params: ModuleParams) -> bool: - if ( - module_params.module_source.startswith("./") - or module_params.module_source.startswith("../") - or module_params.module_source.startswith(module_params.current_dir) - or module_params.module_source.startswith("/") - ): + if module_params.module_source.startswith(("./", "../", module_params.current_dir, "/")): return True if platform.system() == "Windows": logging.debug("Platform: Windows") - if re.match(re.compile("[a-zA-Z]:\\\\"), module_params.module_source): + if re.match(WINDOWS_MODULE_SOURCE_PATH_PATTERN, module_params.module_source): return True return False diff --git a/checkov/terraform/module_loading/module_finder.py b/checkov/terraform/module_loading/module_finder.py index 6c093342aed..0d35ab186d4 100644 --- a/checkov/terraform/module_loading/module_finder.py +++ b/checkov/terraform/module_loading/module_finder.py @@ -8,6 +8,9 @@ from checkov.common.parallelizer.parallel_runner import parallel_runner from checkov.terraform.module_loading.registry import module_loader_registry +MODULE_SOURCE_PATTERN = re.compile(r'[^#]*\bsource\s*=\s*"(?P.*)"') +MODULE_VERSION_PATTERN = re.compile(r'[^#]*\bversion\s*=\s*"(?P=|!=|>=|>|<=|<|~>)?\s*(?P[\d.]+-?\w*)"') + class ModuleDownload: def __init__(self, source_dir: str) -> None: @@ -24,26 +27,27 @@ def address(self) -> str: def find_modules(path: str) -> List[ModuleDownload]: - modules_found = [] + modules_found: list[ModuleDownload] = [] + for root, _, full_file_names in os.walk(path): for file_name in full_file_names: if not file_name.endswith('.tf'): continue with open(os.path.join(path, root, file_name)) as f: + content = f.read() + if "module " not in content: + # if there is no "module " ref in the whole file, then no need to search line by line + continue + try: - in_module = False curr_md = None - for line in f: - if line.strip().startswith('#'): - continue - if not in_module: + for line in content.splitlines(): + if not curr_md: if line.startswith('module'): - in_module = True curr_md = ModuleDownload(os.path.dirname(os.path.join(root, file_name))) continue - if in_module and curr_md: + else: if line.startswith('}'): - in_module = False if curr_md.module_link is None: logging.warning(f'A module at {curr_md.source_dir} had no source, skipping') else: @@ -51,14 +55,16 @@ def find_modules(path: str) -> List[ModuleDownload]: curr_md = None continue - match = re.match(re.compile('.*\\bsource\\s*=\\s*"(?P.*)"'), line) - if match: - curr_md.module_link = match.group('LINK') - continue + if "source" in line: + match = re.match(MODULE_SOURCE_PATTERN, line) + if match: + curr_md.module_link = match.group('link') + continue - match = re.match(re.compile('.*\\bversion\\s*=\\s*"(?P=|!=|>=|>|<=|<|~>)?\\s*(?P[\\d.]+-?\\w*)"'), line) - if match: - curr_md.version = f"{match.group('operator')}{match.group('version')}" if match.group('operator') else match.group('version') + if "version" in line: + match = re.match(MODULE_VERSION_PATTERN, line) + if match: + curr_md.version = f"{match.group('operator')}{match.group('version')}" if match.group('operator') else match.group('version') except (UnicodeDecodeError, FileNotFoundError) as e: logging.warning(f"Skipping {os.path.join(path, root, file_name)} because of {e}") continue diff --git a/checkov/terraform/modules/module_utils.py b/checkov/terraform/modules/module_utils.py index 1bf91a1128a..b44327d9352 100644 --- a/checkov/terraform/modules/module_utils.py +++ b/checkov/terraform/modules/module_utils.py @@ -107,7 +107,7 @@ def remove_module_dependency_from_path(path: str) -> str: :param path: path that looks like "dir/main.tf[other_dir/x.tf#0] :return: only the outer path: dir/main.tf """ - if re.findall(RESOLVED_MODULE_PATTERN, path): + if "#" in path: path = re.sub(RESOLVED_MODULE_PATTERN, '', path) return path diff --git a/checkov/version.py b/checkov/version.py index f789a02f414..4150662ca26 100644 --- a/checkov/version.py +++ b/checkov/version.py @@ -1 +1 @@ -version = '2.3.326' +version = '2.3.328' diff --git a/docs/2.Basics/CLI Command Reference.md b/docs/2.Basics/CLI Command Reference.md index e8dbdd19c6c..02a22e7b913 100644 --- a/docs/2.Basics/CLI Command Reference.md +++ b/docs/2.Basics/CLI Command Reference.md @@ -16,7 +16,7 @@ nav_order: 2 | `-f, --file FILE` | File to scan (can not be used together with --directory). With this option, Checkov will attempt to filter the runners based on the file type. For example, if you specify a ".tf" file, only the terraform and secrets frameworks will be included. You can further limit this (e.g., skip secrets) by using the --skip-framework argument. | | `--skip-path SKIP_PATH` | Path (file or directory) to skip, using regular expression logic, relative to the current working directory. Word boundaries are not implicit; i.e., specifying "dir1" will skip any directory or subdirectory named "dir1". Ignored with -f. Can be specified multiple times. | | `--external-checks-dir EXTERNAL_CHECKS_DIR` | Directory for custom checks to be loaded. Can be repeated | -| `--external-checks-git EXTERNAL_CHECKS_GIT` | GitHub URL of external checks to be added. you can specify a subdirectory after a double-slash //. Cannot be used together with --external-checks-dir | +| `--external-checks-git EXTERNAL_CHECKS_GIT` | GitHub URL of external checks to be added. you can specify a subdirectory after a double-slash //. possible to use ?ref=tags/tagName or ?ref=heads/branchName or ?ref=commit_id. Cannot be used together with --external-checks-dir | | `-l, --list` | List checks | | `-o, --output` {`cli`, `csv`, `cyclonedx`, `cyclonedx_json`, `spdx`, `json`, `junitxml`, `github_failed_only`, `gitlab_sast`, `sarif`} | Report output format. Add multiple outputs by using the flag multiple times (-o sarif -o cli) | | `--output-file-path OUTPUT_FILE_PATH` | Name of the output folder to save the chosen output formats. Advanced usage: By using -o cli -o junitxml --output-file-path console,results.xml the CLI output will be printed to the console and the JunitXML output to the file results.xml. | @@ -40,7 +40,7 @@ nav_order: 2 | `--repo-id REPO_ID` | Identity string of the repository, with form / | | `-b, --branch BRANCH` | Selected branch of the persisted repository. Only has effect when using the --bc-api-key flag | | `--skip-download` | Do not download any data from Bridgecrew. This will omit doc links, severities, etc., as well as custom policies and suppressions if using an API token. Note: it will prevent BC platform IDs from being available in Checkov. | -| `--use-enforcement-rules` | Use the Enforcement rules configured in the platform for hard/soft fail logic, where the matching enforcement rule (or the default rule if no match) determines the behavior: skip checks below soft-fail threshold, include checks equal to or above hard-fail threshold in hard-fail list, and include checks in between in soft-fail list. Overrides can be applied using --check, --skip-check, --soft-fail, --soft-fail-on, or --hard-fail-on, but the order of applying --check and --skip-check (as described under --check) still applies here. Requires BC or PC platform API key. | +| `--use-enforcement-rules` | Use the Enforcement rules configured in the platform for hard/soft fail logic, where the matching enforcement rule (or the default rule if no match) determines the behavior: skip checks below soft-fail threshold, include checks equal to or above hard-fail threshold in hard-fail list, and include checks in between in soft-fail list. Overrides can be applied using --check, --skip-check, --soft-fail, --soft-fail-on, or --hard-fail-on, but the order of applying --check and --skip-check (as described under --check) still applies here. Requires BC or PC platform API key. | | `--no-guide` | Deprecated - use --skip-download | | `--skip-suppressions` | Deprecated - use --skip-download | | `--skip-policy-download` | Deprecated - use --skip-download | diff --git a/kubernetes/requirements.txt b/kubernetes/requirements.txt index e59fafae1ce..5498c12cebc 100644 --- a/kubernetes/requirements.txt +++ b/kubernetes/requirements.txt @@ -1 +1 @@ -checkov==2.3.326 +checkov==2.3.328 diff --git a/tests/common/goget/test_goget_github.py b/tests/common/goget/test_goget_github.py index cbc2308f0a0..8ebc30fc3fd 100644 --- a/tests/common/goget/test_goget_github.py +++ b/tests/common/goget/test_goget_github.py @@ -17,49 +17,49 @@ def test_parse_source_and_subdirectory(self): git_url, subdir = getter._source_subdir() self.assertEqual("https://my-git.com/repository-name.git", git_url, "Parsed source url should contain hostname and path") self.assertEqual("/sub/path", subdir, "Parsed source subdirectory should contain absolute (sub)path") - + def test_parse_source_and_subdirectory_without_git(self): url = "https://my-git.com/repository-name//sub/path" getter = GitGetter(url) git_url, subdir = getter._source_subdir() self.assertEqual("https://my-git.com/repository-name", git_url, "Parsed source url should contain hostname and path") self.assertEqual("/sub/path", subdir, "Parsed source subdirectory should contain absolute (sub)path") - + def test_parse_source_with_query(self): url = "https://my-git.com/repository-name?key=value" getter = GitGetter(url) git_url, subdir = getter._source_subdir() self.assertEqual("https://my-git.com/repository-name?key=value", git_url, "Parsed source url should contain hostname, path and query") self.assertEqual("", subdir, "Parsed source subdirectory should be empty") - + def test_parse_source_and_subdirectory_with_query(self): url = "https://my-git.com/repository-name//sub/path?key=value" getter = GitGetter(url) git_url, subdir = getter._source_subdir() self.assertEqual("https://my-git.com/repository-name?key=value", git_url, "Parsed source url should contain hostname, path and query") self.assertEqual("/sub/path", subdir, "Parsed source subdirectory should contain absolute (sub)path") - + def test_parse_source_without_scheme(self): url = "my-git.com/repository-name" getter = GitGetter(url) git_url, subdir = getter._source_subdir() self.assertEqual("my-git.com/repository-name", git_url, "Parsed source url should contain hostname and path") self.assertEqual("", subdir, "Parsed source subdirectory should be empty") - + def test_parse_source_and_subdirectory_without_scheme(self): url = "my-git.com/repository-name//sub/path" getter = GitGetter(url) git_url, subdir = getter._source_subdir() self.assertEqual("my-git.com/repository-name", git_url, "Parsed source url should contain hostname ane path") self.assertEqual("/sub/path", subdir, "Parsed source subdirectory should contain absolute (sub)path") - + def test_parse_source_with_query_without_scheme(self): url = "my-git.com/repository-name?key=value" getter = GitGetter(url) git_url, subdir = getter._source_subdir() self.assertEqual("my-git.com/repository-name?key=value", git_url, "Parsed source url should contain hostname, path and query") self.assertEqual("", subdir, "Parsed source subdirectory should be empty") - + def test_parse_source_and_subdirectory_with_query_without_scheme(self): url = "my-git.com/repository-name//sub/path?key=value" getter = GitGetter(url) @@ -67,6 +67,43 @@ def test_parse_source_and_subdirectory_with_query_without_scheme(self): self.assertEqual("my-git.com/repository-name?key=value", git_url, "Parsed source url should contain hostname, path and query") self.assertEqual("/sub/path", subdir, "Parsed source subdirectory should contain absolute (sub)path") + def test_parse_tag(self): + url = "https://my-git.com/owner/repository-name?ref=tags/v1.2.3" + getter = GitGetter(url) + git_url = getter.extract_git_ref(url) + + self.assertEqual("https://my-git.com/owner/repository-name", git_url, + "Parsed source url is wrong") + self.assertEqual("v1.2.3", getter.tag, "Parsed source tag is wrong") + + def test_parse_tag_backward_compat(self): + url = "https://my-git.com/owner/repository-name?ref=v1.2.3" + getter = GitGetter(url) + git_url = getter.extract_git_ref(url) + + self.assertEqual("https://my-git.com/owner/repository-name", git_url, + "Parsed source url is wrong") + self.assertEqual("v1.2.3", getter.tag, "Parsed source tag is wrong") + + def test_parse_branch(self): + url = "https://my-git.com/owner/repository-name?ref=heads/omryBranch" + getter = GitGetter(url) + git_url = getter.extract_git_ref(url) + + self.assertEqual("https://my-git.com/owner/repository-name", git_url, + "Parsed source url is wrong") + self.assertEqual("omryBranch", getter.branch, "Parsed source branch is wrong") + + def test_parse_commit_id(self): + url = "https://my-git.com/owner/repository-name?ref=aa218f56b14c9653891f9e74264a383fa43fefbd" + getter = GitGetter(url) + git_url = getter.extract_git_ref(url) + + self.assertEqual("https://my-git.com/owner/repository-name", git_url, + "Parsed source url is wrong") + self.assertEqual("aa218f56b14c9653891f9e74264a383fa43fefbd", getter.commit_id, + "Parsed source commit_id is wrong") + if __name__ == '__main__': unittest.main() diff --git a/tests/kubernetes/image_referencer/provider/test_k8s.py b/tests/kubernetes/image_referencer/provider/test_k8s.py index 9e144e83e92..ebd062c38f3 100644 --- a/tests/kubernetes/image_referencer/provider/test_k8s.py +++ b/tests/kubernetes/image_referencer/provider/test_k8s.py @@ -50,16 +50,18 @@ def test_extract_images_from_resources(graph_framework): images = provider.extract_images_from_resources() # then - assert images == [ - Image( + assert len(images) == 2 + nginx_image = Image( file_path="/pod.yaml", name="nginx", start_line=1, end_line=16, related_resource_id="/pod.yaml:None", - ), - Image(file_path="/pod.yaml", name="busybox", start_line=1, end_line=16, related_resource_id="/pod.yaml:None"), - ] + ) + busybox_image = Image(file_path="/pod.yaml", name="busybox", start_line=1, end_line=16, + related_resource_id="/pod.yaml:None") + assert nginx_image in images + assert busybox_image in images @pytest.mark.parametrize("graph_framework", ['NETWORKX', 'IGRAPH']) diff --git a/tests/kubernetes/image_referencer/test_manager.py b/tests/kubernetes/image_referencer/test_manager.py index f544c4e7fde..e2e9e367734 100644 --- a/tests/kubernetes/image_referencer/test_manager.py +++ b/tests/kubernetes/image_referencer/test_manager.py @@ -7,6 +7,7 @@ from checkov.common.graph.graph_builder import CustomAttributes from checkov.kubernetes.image_referencer.manager import KubernetesImageReferencerManager from checkov.common.images.image_referencer import Image +from checkov.kubernetes.runner import Runner as KubernetesRunner @pytest.mark.parametrize("graph_framework", ['NETWORKX', 'IGRAPH']) @@ -54,3 +55,43 @@ def test_extract_images_from_resources(graph_framework): ), ] + +@pytest.fixture() +def graph_resource_with_containers_and_init_containers(): + resource = { + "file_path_": "/pod.yaml", + "__endline__": 16, + "__startline__": 1, + "spec": { + "initContainers": [ + { + "name": "test-container", + "image": "nginx", + }, + ], + "containers": [ + { + "name": "test-container", + "image": "nginx", + }, + ], + }, + "resource_type": "Pod", + } + graph = igraph.Graph() + graph.add_vertex( + name='duplicated_image', + block_type_='resource', + resource_type=resource[ + CustomAttributes.RESOURCE_TYPE] if CustomAttributes.RESOURCE_TYPE in resource else None, + attr=resource, + ) + return graph + + +def test_no_duplications_while_extracting_image_names(graph_resource_with_containers_and_init_containers: igraph.Graph): + manager = KubernetesImageReferencerManager(graph_connector=graph_resource_with_containers_and_init_containers) + images = manager.extract_images_from_resources() + assert len(images) == 1 + image = images[0] + assert image.name == 'nginx' diff --git a/tests/kustomize/test_runner_image_referencer.py b/tests/kustomize/test_runner_image_referencer.py index 416b47395c3..0803f2c8244 100644 --- a/tests/kustomize/test_runner_image_referencer.py +++ b/tests/kustomize/test_runner_image_referencer.py @@ -69,10 +69,10 @@ def test_deployment_resources(mocker: MockerFixture, allow_kustomize_file_edits: '/service.yaml'] assert len(sca_image_report.resources) == 2 - assert sca_image_report.resources == { - f'base/kustomization.yaml (wordpress:4.8-apache lines:{code_lines} (sha256:2460522297)).go', - f'overlays/prod/kustomization.yaml (wordpress:4.8-apache lines:{code_lines} (sha256:2460522297)).go' - } + assert f'base/kustomization.yaml (wordpress:4.8-apache lines:{code_lines} (sha256:2460522297)).go' in \ + sca_image_report.resources + assert f'overlays/prod/kustomization.yaml (wordpress:4.8-apache lines:{code_lines} (sha256:2460522297)).go' in \ + sca_image_report.resources assert len(sca_image_report.passed_checks) == 0 assert len(sca_image_report.failed_checks) == 6 assert len(sca_image_report.skipped_checks) == 0