From b62d3c4cd049764d1ceedf4a34d3524272600e68 Mon Sep 17 00:00:00 2001 From: rshamasnah Date: Wed, 15 Mar 2023 16:14:43 +0200 Subject: [PATCH] support scope as list --- checkov/common/checks_infra/checks_parser.py | 62 ++++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/checkov/common/checks_infra/checks_parser.py b/checkov/common/checks_infra/checks_parser.py index 311427397c5..b8eb406c949 100644 --- a/checkov/common/checks_infra/checks_parser.py +++ b/checkov/common/checks_infra/checks_parser.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Dict, Any, List, Optional, Type, TYPE_CHECKING +import logging +from typing import Dict, Any, List, Optional, Type, TYPE_CHECKING, cast from checkov.common.checks_infra.solvers import ( EqualsAttributeSolver, @@ -53,6 +54,7 @@ NumberOfWordsGreaterThanOrEqualAttributeSolver, NumberOfWordsLessThanAttributeSolver, NumberOfWordsLessThanOrEqualAttributeSolver, + NotWithinAttributeSolver, ) from checkov.common.checks_infra.solvers.connections_solvers.connection_one_exists_solver import \ ConnectionOneExistsSolver @@ -80,6 +82,7 @@ "contains": ContainsAttributeSolver, "not_exists": NotExistsAttributeSolver, "within": WithinAttributeSolver, + "not_within": NotWithinAttributeSolver, "not_contains": NotContainsAttributeSolver, "starting_with": StartingWithAttributeSolver, "not_starting_with": NotStartingWithAttributeSolver, @@ -146,21 +149,56 @@ JSONPATH_PREFIX = "jsonpath_" -class NXGraphCheckParser(BaseGraphCheckParser): +class GraphCheckParser(BaseGraphCheckParser): + def validate_check_config(self, file_path: str, raw_check: dict[str, dict[str, Any]]) -> bool: + missing_fields = [] + + # check existence of metadata block + if "metadata" in raw_check: + metadata = raw_check["metadata"] + if "id" not in metadata: + missing_fields.append("metadata.id") + if "name" not in metadata: + missing_fields.append("metadata.name") + if "category" not in metadata: + missing_fields.append("metadata.category") + else: + missing_fields.extend(("metadata.id", "metadata.name", "metadata.category")) + + # check existence of definition block + if "definition" not in raw_check: + missing_fields.append("definition") + + if missing_fields: + logging.warning(f"Custom policy {file_path} is missing required fields {', '.join(missing_fields)}") + return False + + # check if definition block is not obviously invalid + definition = raw_check["definition"] + if not isinstance(definition, (list, dict)): + logging.warning( + f"Custom policy {file_path} has an invalid 'definition' block type '{type(definition).__name__}', " + "needs to be either a 'list' or 'dict'" + ) + return False + + return True + def parse_raw_check(self, raw_check: Dict[str, Dict[str, Any]], **kwargs: Any) -> BaseGraphCheck: policy_definition = raw_check.get("definition", {}) - check = self._parse_raw_check(policy_definition, kwargs.get("resources_types")) + check = self._parse_raw_check(policy_definition, kwargs.get("resources_types"), raw_check) check.id = raw_check.get("metadata", {}).get("id", "") check.name = raw_check.get("metadata", {}).get("name", "") check.category = raw_check.get("metadata", {}).get("category", "") check.frameworks = raw_check.get("metadata", {}).get("frameworks", []) check.guideline = raw_check.get("metadata", {}).get("guideline") + check.check_path = kwargs.get("check_path", "") solver = self.get_check_solver(check) check.set_solver(solver) return check - def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[List[str]]) -> BaseGraphCheck: + def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[List[str]], json_check: Dict[str, Any]) -> BaseGraphCheck: check = BaseGraphCheck() complex_operator = get_complex_operator(raw_check) if complex_operator: @@ -174,7 +212,7 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[ sub_solvers = [sub_solvers] for sub_solver in sub_solvers: - check.sub_checks.append(self._parse_raw_check(sub_solver, resources_types)) + check.sub_checks.append(self._parse_raw_check(sub_solver, resources_types, json_check)) resources_types_of_sub_solvers = [ force_list(q.resource_types) for q in check.sub_checks if q is not None and q.resource_types is not None ] @@ -190,6 +228,15 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[ or (isinstance(resource_type, list) and resource_type[0].lower() == "all") ): check.resource_types = resources_types or [] + elif "provider" in resource_type: + provider = json_check.get("scope", {}).get("provider", "") + provider_type = "" + if cast(str, provider): + provider_type = provider.lower() + elif cast(list, provider): + provider_type = provider[0].lower() + check.resource_types.append(f"provider.{provider_type}") + else: check.resource_types = resource_type @@ -251,6 +298,11 @@ def get_check_solver(self, check: BaseGraphCheck) -> BaseSolver: return solver +class NXGraphCheckParser(GraphCheckParser): + # TODO: delete after downstream adjustments + pass + + def get_complex_operator(raw_check: Dict[str, Any]) -> Optional[str]: for operator in operators_to_complex_solver_classes.keys(): if raw_check.get(operator):