Skip to content

Commit

Permalink
feat(kubernetes): Added new endpoint for both helm and kustomize (#5481)
Browse files Browse the repository at this point in the history
* Created new api for helm and kustomize to not need the file content to run get_binary_output

* Added kustomize test to see the results are the same

* Added helm test as well

* mypy
  • Loading branch information
bo156 committed Aug 24, 2023
1 parent cade73f commit 41782e0
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 10 deletions.
8 changes: 8 additions & 0 deletions checkov/helm/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ def _get_target_dir(chart_item: tuple[str, dict[str, Any]], root_folder: str, ta
target_dir = target_dir[:-len(chart_name)]
return target_dir

@staticmethod
def get_binary_output_from_directory(chart_dir: str, target_dir: str, helm_command: str,
runner_filter: RunnerFilter, timeout: int = 3600) \
-> tuple[bytes, tuple[str, dict[str, Any]]] | tuple[None, None]:
chart_meta = Runner.parse_helm_chart_details(chart_dir)
chart_item = (chart_dir, chart_meta or {})
return Runner.get_binary_output(chart_item, target_dir, helm_command, runner_filter, timeout)

@staticmethod
def get_binary_output(
chart_item: tuple[str, dict[str, Any]], target_dir: str, helm_command: str, runner_filter: RunnerFilter, timeout: int = 3600
Expand Down
33 changes: 23 additions & 10 deletions checkov/kustomize/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,28 +472,31 @@ def check_system_deps(self) -> str | None:
logging.info(f"Could not find usable tools locally to process {self.check_type} checks. Framework will be disabled for this run.")
return self.check_type

def _handle_overlay_case(self, file_path: str) -> None:
def _handle_overlay_case(self, file_path: str,
kustomizeProcessedFolderAndMeta: dict[str, dict[str, Any]] | None = None) -> None:
if kustomizeProcessedFolderAndMeta is None:
kustomizeProcessedFolderAndMeta = self.kustomizeProcessedFolderAndMeta
for parent in pathlib.Path(file_path).parents:
for potentialBase in self.potentialBases:
pathlib_base_object = pathlib.Path(potentialBase)
potential_base_path = pathlib_base_object.parents[1]
if parent == potential_base_path.resolve():
self.kustomizeProcessedFolderAndMeta[file_path]['calculated_bases'] = str(pathlib_base_object.parent)
kustomizeProcessedFolderAndMeta[file_path]['calculated_bases'] = str(pathlib_base_object.parent)
try:
relativeToFullPath = f"{file_path}/{self.kustomizeProcessedFolderAndMeta[file_path]['referenced_bases'][0]}"
if pathlib.Path(self.kustomizeProcessedFolderAndMeta[file_path]['calculated_bases']) == pathlib.Path(relativeToFullPath).resolve():
self.kustomizeProcessedFolderAndMeta[file_path]['validated_base'] = str(pathlib.Path(self.kustomizeProcessedFolderAndMeta[file_path]['calculated_bases']))
checkov_kustomize_env_name_by_path = str(pathlib.Path(file_path).relative_to(pathlib.Path(self.kustomizeProcessedFolderAndMeta[file_path]['calculated_bases']).parent))
self.kustomizeProcessedFolderAndMeta[file_path]['overlay_name'] = checkov_kustomize_env_name_by_path
logging.debug(f"Overlay based on {self.kustomizeProcessedFolderAndMeta[file_path]['validated_base']}, naming overlay {checkov_kustomize_env_name_by_path} for Checkov Results.")
relativeToFullPath = f"{file_path}/{kustomizeProcessedFolderAndMeta[file_path]['referenced_bases'][0]}"
if pathlib.Path(kustomizeProcessedFolderAndMeta[file_path]['calculated_bases']) == pathlib.Path(relativeToFullPath).resolve():
kustomizeProcessedFolderAndMeta[file_path]['validated_base'] = str(pathlib.Path(kustomizeProcessedFolderAndMeta[file_path]['calculated_bases']))
checkov_kustomize_env_name_by_path = str(pathlib.Path(file_path).relative_to(pathlib.Path(kustomizeProcessedFolderAndMeta[file_path]['calculated_bases']).parent))
kustomizeProcessedFolderAndMeta[file_path]['overlay_name'] = checkov_kustomize_env_name_by_path
logging.debug(f"Overlay based on {kustomizeProcessedFolderAndMeta[file_path]['validated_base']}, naming overlay {checkov_kustomize_env_name_by_path} for Checkov Results.")
else:
checkov_kustomize_env_name_by_path = pathlib.Path(file_path).stem
self.kustomizeProcessedFolderAndMeta[file_path]['overlay_name'] = checkov_kustomize_env_name_by_path
kustomizeProcessedFolderAndMeta[file_path]['overlay_name'] = checkov_kustomize_env_name_by_path
logging.debug(f"Could not confirm base dir for Kustomize overlay/env. Using {checkov_kustomize_env_name_by_path} for Checkov Results.")

except KeyError:
checkov_kustomize_env_name_by_path = pathlib.Path(file_path).stem
self.kustomizeProcessedFolderAndMeta[file_path]['overlay_name'] = checkov_kustomize_env_name_by_path
kustomizeProcessedFolderAndMeta[file_path]['overlay_name'] = checkov_kustomize_env_name_by_path
logging.debug(f"Could not confirm base dir for Kustomize overlay/env. Using {checkov_kustomize_env_name_by_path} for Checkov Results.")

@staticmethod
Expand Down Expand Up @@ -593,6 +596,16 @@ def _get_env_or_base_path_prefix(

return env_or_base_path_prefix

def get_binary_output_from_directory(
self,
file_path: str,
template_renderer_command: str,
) -> tuple[bytes, str] | tuple[None, None]:
kustomizeProcessedFolderAndMeta = {file_path: self._parseKustomization(file_path)}
if kustomizeProcessedFolderAndMeta[file_path].get('type') == 'overlay':
self._handle_overlay_case(file_path, kustomizeProcessedFolderAndMeta)
return self.get_binary_output(file_path, kustomizeProcessedFolderAndMeta, template_renderer_command)

def get_binary_output(
self,
file_path: str,
Expand Down
16 changes: 16 additions & 0 deletions tests/helm/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ def test_runner_invalid_chart(self):

self.assertEqual(len(report.failed_checks), 0)

@unittest.skipIf(not helm_exists(), "helm not installed")
def test_get_binary_output_from_directory_equals_to_get_binary_result(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
scan_dir_path = os.path.join(current_dir, "runner", "resources", "schema-registry")

runner_filter = RunnerFilter(framework=['helm'], use_enforcement_rules=False)

chart_meta = Runner.parse_helm_chart_details(scan_dir_path)
chart_item = (scan_dir_path, chart_meta)
regular_result = Runner.get_binary_output(chart_item, target_dir='./tmp', helm_command="helm",
runner_filter=runner_filter)
result_from_directory = Runner.get_binary_output_from_directory(str(scan_dir_path),
target_dir='./tmp', helm_command="helm",
runner_filter=runner_filter)
assert regular_result == result_from_directory


if __name__ == "__main__":
unittest.main()
18 changes: 18 additions & 0 deletions tests/kustomize/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,24 @@ def test_no_file_type_exists(self):
all_checks = report.failed_checks + report.passed_checks
self.assertEqual(len(all_checks), 0) # we should no get any results

@unittest.skipIf(os.name == "nt" or not kustomize_exists(), "kustomize not installed or Windows OS")
def test_get_binary_output_from_directory_equals_to_get_binary_result(self):
scan_dir_path = Path(__file__).parent / "runner/resources/example/no_type"
dir_rel_path = os.path.relpath(scan_dir_path).replace('\\', '/')
runner = Runner()
runner.templateRendererCommand = "kustomize"
runner.templateRendererCommandOptions = "build"

# Runs the runner fully just to build `runner.kustomizeProcessedFolderAndMeta`
_ = runner.run(root_folder=dir_rel_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework=['kustomize']))
regular_result = runner.get_binary_output(str(scan_dir_path), runner.kustomizeProcessedFolderAndMeta,
runner.templateRendererCommand)
result_from_directory = runner.get_binary_output_from_directory(str(scan_dir_path),
runner.templateRendererCommand)
assert regular_result == result_from_directory



if __name__ == '__main__':
unittest.main()

0 comments on commit 41782e0

Please sign in to comment.