Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/github_actions/actions/upload-pag…
Browse files Browse the repository at this point in the history
…es-artifact-2.0.0
  • Loading branch information
gruebel authored Jul 17, 2023
2 parents 0d444a9 + 70a42f1 commit 9f17673
Show file tree
Hide file tree
Showing 24 changed files with 334 additions and 194 deletions.
16 changes: 14 additions & 2 deletions checkov/common/goget/github/get_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
git_import_error = e

COMMIT_ID_PATTERN = re.compile(r"\?(ref=)(?P<commit_id>([0-9a-f]{40}))")
TAG_PATTERN = re.compile(r'\?(ref=)(?P<tag>(.*))')
TAG_PATTERN = re.compile(r'\?(ref=)(?P<tag>(.*))') # technically should be with ?ref=tags/ but this catches both
BRANCH_PATTERN = re.compile(r'\?(ref=heads/)(?P<branch>(.*))')


class GitGetter(BaseGetter):
Expand All @@ -23,13 +24,22 @@ def __init__(self, url: str, create_clone_and_result_dirs: bool = True) -> None:
self.create_clone_and_res_dirs = create_clone_and_result_dirs
self.tag = ''
self.commit_id: str | None = None
self.branch = ''

if "?ref" in url:
url = self.extract_git_ref(url=url)

super().__init__(url)

def extract_git_ref(self, url: str) -> str:
search_branch = re.search(BRANCH_PATTERN, url)
if search_branch:
self.branch = search_branch.group("branch")
# remove heads/ from ref= to get actual branch name
# self.branch = re.sub('heads.*/', '', url)
url = re.sub(BRANCH_PATTERN, '', url)
return url

search_commit_id = re.search(COMMIT_ID_PATTERN, url)
if search_commit_id:
self.commit_id = search_commit_id.group("commit_id")
Expand Down Expand Up @@ -71,7 +81,9 @@ def do_get(self) -> str:
def _clone(self, git_url: str, clone_dir: str) -> None:
self.logger.debug(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}")
with temp_environ(GIT_TERMINAL_PROMPT="0"): # disables user prompts originating from GIT
if self.commit_id:
if self.branch:
Repo.clone_from(git_url, clone_dir, branch=self.branch, depth=1) # depth=1 for shallow clone
elif self.commit_id: # no commit id support for branch
repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone
repo.git.checkout(self.commit_id)
elif self.tag:
Expand Down
5 changes: 3 additions & 2 deletions checkov/common/util/ext_argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ def add_parser_args(self) -> None:
self.add(
"--external-checks-git",
action="append",
help="Github url of external checks to be added. \n you can specify a subdirectory after a "
"double-slash //. \n cannot be used together with --external-checks-dir",
help="Github url of external checks to be added.\n you can specify a subdirectory after a double-slash //."
"\n possible to use ?ref=tags/tagName or ?ref=heads/branchName or ?ref=commit_id"
"\n cannot be used together with --external-checks-dir",
)
self.add(
"-l",
Expand Down
64 changes: 21 additions & 43 deletions checkov/kubernetes/image_referencer/provider/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,77 +19,55 @@ def __init__(self, graph_connector: DiGraph) -> None:


def extract_images_from_cron_job(resource: dict[str, Any]) -> list[str]:
image_names: list[str] = []

spec = find_in_dict(input_dict=resource, key_path="spec/jobTemplate/spec/template/spec")
if isinstance(spec, dict):
containers = spec.get("containers")
image_names.extend(extract_images_from_containers(containers=containers))

containers = spec.get("initContainers")
image_names.extend(extract_images_from_containers(containers=containers))

return image_names
return _extract_images_from_spec(spec)


def extract_images_from_pod(resource: dict[str, Any]) -> list[str]:
image_names: list[str] = []

spec = resource.get("spec")
if isinstance(spec, dict):
containers = spec.get("containers")
image_names.extend(extract_images_from_containers(containers=containers))

containers = spec.get("initContainers")
image_names.extend(extract_images_from_containers(containers=containers))

return image_names
return _extract_images_from_spec(spec)


def extract_images_from_pod_template(resource: dict[str, Any]) -> list[str]:
# the 'PodTemplate' object is usually not defined by the user, but rather used by Kubernetes internally
image_names: list[str] = []

spec = find_in_dict(input_dict=resource, key_path="template/spec")
if isinstance(spec, dict):
containers = spec.get("containers")
image_names.extend(extract_images_from_containers(containers=containers))

containers = spec.get("initContainers")
image_names.extend(extract_images_from_containers(containers=containers))

return image_names
return _extract_images_from_spec(spec)


def extract_images_from_template(resource: dict[str, Any]) -> list[str]:
image_names: list[str] = []

spec = find_in_dict(input_dict=resource, key_path="spec/template/spec")
if isinstance(spec, dict):
containers = spec.get("containers")
image_names.extend(extract_images_from_containers(containers=containers))

containers = spec.get("initContainers")
image_names.extend(extract_images_from_containers(containers=containers))

return image_names
return _extract_images_from_spec(spec)


def extract_images_from_containers(containers: Any) -> list[str]:
def extract_images_from_containers(containers: Any) -> set[str]:
"""Helper function to extract image names from containers block"""

image_names: list[str] = []
image_names: set[str] = set()

if isinstance(containers, list):
for container in containers:
if isinstance(container, dict):
image = container.get("image")
if image and isinstance(image, str):
image_names.append(image)
image_names.add(image)

return image_names


def _extract_images_from_spec(spec: dict[str, Any] | None) -> list[str]:
image_names: set[str] = set()

if isinstance(spec, dict):
containers = spec.get("containers")
image_names.update(extract_images_from_containers(containers=containers))

containers = spec.get("initContainers")
image_names.update(extract_images_from_containers(containers=containers))

# Makes sure we return no duplications
return list(image_names)


# needs to be at the bottom to add the defined functions
SUPPORTED_K8S_IMAGE_RESOURCE_TYPES: "dict[str, _ExtractImagesCallableAlias]" = {
"CronJob": extract_images_from_cron_job,
Expand Down
2 changes: 1 addition & 1 deletion checkov/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def commit_repository(self) -> str | None:
def get_external_checks_dir(self) -> list[str]:
external_checks_dir: "list[str]" = self.config.external_checks_dir
if self.config.external_checks_git:
git_getter = GitGetter(self.config.external_checks_git[0])
git_getter = GitGetter(url=self.config.external_checks_git[0])
external_checks_dir = [git_getter.get()]
atexit.register(shutil.rmtree, str(Path(external_checks_dir[0]).parent))
return external_checks_dir
Expand Down
14 changes: 8 additions & 6 deletions checkov/terraform/checks/resource/aws/MQBrokerVersion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck

minimumActiveMQ = 5.16
minimumRabbitMQ = 3.8
ENGINE_VERSION_PATTERN = re.compile(r"(\d+\.\d+.\d+)")
ENGINE_VERSION_SHORT_PATTERN = re.compile(r"(\d+\.\d+)")
MINIMUM_ACTIVEMQ_VERSION = 5.16
MINIMUM_RABBITMQ_VERSION = 3.8


class MQBrokerVersion(BaseResourceCheck):
Expand All @@ -19,15 +21,15 @@ def scan_resource_conf(self, conf) -> CheckResult:
if conf.get("engine_type"):
mq_type = conf.get("engine_type")[0]
semantic = conf.get("engine_version", [''])[0]
if not re.search(r'(\d+\.\d+.\d+)', semantic):
if not re.search(ENGINE_VERSION_PATTERN, semantic):
return CheckResult.UNKNOWN
version = float(re.search(r'(\d+\.\d+)', semantic).group())
version = float(re.search(ENGINE_VERSION_SHORT_PATTERN, semantic).group())
if mq_type in 'ActiveMQ':
if version >= minimumActiveMQ:
if version >= MINIMUM_ACTIVEMQ_VERSION:
return CheckResult.PASSED

if mq_type in 'RabbitMQ':
if version >= minimumRabbitMQ:
if version >= MINIMUM_RABBITMQ_VERSION:
return CheckResult.PASSED

return CheckResult.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

import dpath
import re

from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.models.consts import ANY_VALUE
from checkov.common.util.type_forcers import force_list
from checkov.terraform.graph_builder.utils import get_referenced_vertices_in_value
from checkov.terraform.parser_functions import handle_dynamic_values

ARRAY_INDEX_PATTERN = re.compile(r"^\[?\d+]?$")


class BaseResourceValueCheck(BaseResourceCheck):
def __init__(
Expand All @@ -32,7 +35,7 @@ def _filter_key_path(path: str) -> List[str]:
:param path: valid JSONPath of an attribute
:return: List of named attributes with respect to the input JSONPath order
"""
return [x for x in path.split("/") if not re.search(re.compile(r"^\[?\d+]?$"), x)]
return [x for x in path.split("/") if not re.search(ARRAY_INDEX_PATTERN, x)]

@staticmethod
def _is_nesting_key(inspected_attributes: List[str], key: List[str]) -> bool:
Expand Down
3 changes: 1 addition & 2 deletions checkov/terraform/context_parsers/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ def _read_file_lines(self) -> List[Tuple[int, str]]:

@staticmethod
def is_optional_comment_line(line: str) -> bool:
line_without_whitespace = line.replace(" ", "")
return "checkov:skip=" in line_without_whitespace or "bridgecrew:skip=" in line_without_whitespace
return "checkov:skip=" in line or "bridgecrew:skip=" in line

def _collect_skip_comments(self, definition_blocks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Expand Down
5 changes: 0 additions & 5 deletions checkov/terraform/evaluation/base_variable_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ def evaluate_variables(self) -> Any:
"""
raise NotImplementedError()

@staticmethod
def _is_variable_only_expression(assignment_regex: str, entry_expression: str) -> bool:
exact_assignment_regex = re.compile(r"^" + assignment_regex + r"$")
return len(re.findall(exact_assignment_regex, entry_expression)) > 0

@staticmethod
def extract_context_path(definition_path: str) -> Tuple[str, str]:
"""
Expand Down
11 changes: 7 additions & 4 deletions checkov/terraform/graph_builder/foreach/abstract_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,13 @@ def extract_from_list(val: Any) -> Any:

@staticmethod
def need_to_add_quotes(code: str, key: str) -> bool:
patterns = [r'lower\(' + key + r'\)', r'upper\(' + key + r'\)']
for pattern in patterns:
if re.search(pattern, code):
return True
if "lower" in code or "upper" in code:
patterns = (r'lower\(' + key + r'\)', r'upper\(' + key + r'\)')
for pattern in patterns:
if re.search(pattern, code):
return True

if f'[{key}]' in code:
return True

return False
9 changes: 5 additions & 4 deletions checkov/terraform/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ def _collect_dynamic_dependent_keys(self, dynamic_block_name: str, value: str |
dynamic_content_path: List[str], dynamic_changed_attributes: List[str]) -> None:
if isinstance(value, str):
dynamic_ref = f'{dynamic_block_name}.value'
interpolation_matches = re.findall(INTERPOLATION_EXPR, value)
for match in interpolation_matches:
if dynamic_ref in match:
dynamic_changed_attributes.append(key_path)
if "${" in value:
interpolation_matches = re.findall(INTERPOLATION_EXPR, value)
for match in interpolation_matches:
if dynamic_ref in match:
dynamic_changed_attributes.append(key_path)
elif isinstance(value, list):
for idx, sub_value in enumerate(value):
self._collect_dynamic_dependent_keys(
Expand Down
Loading

0 comments on commit 9f17673

Please sign in to comment.