Skip to content

Commit

Permalink
support scope as list
Browse files Browse the repository at this point in the history
  • Loading branch information
rshamasnah committed Mar 15, 2023
1 parent 95e15ed commit b62d3c4
Showing 1 changed file with 57 additions and 5 deletions.
62 changes: 57 additions & 5 deletions checkov/common/checks_infra/checks_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import Dict, Any, List, Optional, Type, TYPE_CHECKING
import logging
from typing import Dict, Any, List, Optional, Type, TYPE_CHECKING, cast

from checkov.common.checks_infra.solvers import (
EqualsAttributeSolver,
Expand Down Expand Up @@ -53,6 +54,7 @@
NumberOfWordsGreaterThanOrEqualAttributeSolver,
NumberOfWordsLessThanAttributeSolver,
NumberOfWordsLessThanOrEqualAttributeSolver,
NotWithinAttributeSolver,
)
from checkov.common.checks_infra.solvers.connections_solvers.connection_one_exists_solver import \
ConnectionOneExistsSolver
Expand Down Expand Up @@ -80,6 +82,7 @@
"contains": ContainsAttributeSolver,
"not_exists": NotExistsAttributeSolver,
"within": WithinAttributeSolver,
"not_within": NotWithinAttributeSolver,
"not_contains": NotContainsAttributeSolver,
"starting_with": StartingWithAttributeSolver,
"not_starting_with": NotStartingWithAttributeSolver,
Expand Down Expand Up @@ -146,21 +149,56 @@
JSONPATH_PREFIX = "jsonpath_"


class NXGraphCheckParser(BaseGraphCheckParser):
class GraphCheckParser(BaseGraphCheckParser):
def validate_check_config(self, file_path: str, raw_check: dict[str, dict[str, Any]]) -> bool:
missing_fields = []

# check existence of metadata block
if "metadata" in raw_check:
metadata = raw_check["metadata"]
if "id" not in metadata:
missing_fields.append("metadata.id")
if "name" not in metadata:
missing_fields.append("metadata.name")
if "category" not in metadata:
missing_fields.append("metadata.category")
else:
missing_fields.extend(("metadata.id", "metadata.name", "metadata.category"))

# check existence of definition block
if "definition" not in raw_check:
missing_fields.append("definition")

if missing_fields:
logging.warning(f"Custom policy {file_path} is missing required fields {', '.join(missing_fields)}")
return False

# check if definition block is not obviously invalid
definition = raw_check["definition"]
if not isinstance(definition, (list, dict)):
logging.warning(
f"Custom policy {file_path} has an invalid 'definition' block type '{type(definition).__name__}', "
"needs to be either a 'list' or 'dict'"
)
return False

return True

def parse_raw_check(self, raw_check: Dict[str, Dict[str, Any]], **kwargs: Any) -> BaseGraphCheck:
policy_definition = raw_check.get("definition", {})
check = self._parse_raw_check(policy_definition, kwargs.get("resources_types"))
check = self._parse_raw_check(policy_definition, kwargs.get("resources_types"), raw_check)
check.id = raw_check.get("metadata", {}).get("id", "")
check.name = raw_check.get("metadata", {}).get("name", "")
check.category = raw_check.get("metadata", {}).get("category", "")
check.frameworks = raw_check.get("metadata", {}).get("frameworks", [])
check.guideline = raw_check.get("metadata", {}).get("guideline")
check.check_path = kwargs.get("check_path", "")
solver = self.get_check_solver(check)
check.set_solver(solver)

return check

def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[List[str]]) -> BaseGraphCheck:
def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[List[str]], json_check: Dict[str, Any]) -> BaseGraphCheck:
check = BaseGraphCheck()
complex_operator = get_complex_operator(raw_check)
if complex_operator:
Expand All @@ -174,7 +212,7 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[
sub_solvers = [sub_solvers]

for sub_solver in sub_solvers:
check.sub_checks.append(self._parse_raw_check(sub_solver, resources_types))
check.sub_checks.append(self._parse_raw_check(sub_solver, resources_types, json_check))
resources_types_of_sub_solvers = [
force_list(q.resource_types) for q in check.sub_checks if q is not None and q.resource_types is not None
]
Expand All @@ -190,6 +228,15 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[
or (isinstance(resource_type, list) and resource_type[0].lower() == "all")
):
check.resource_types = resources_types or []
elif "provider" in resource_type:
provider = json_check.get("scope", {}).get("provider", "")
provider_type = ""
if cast(str, provider):
provider_type = provider.lower()
elif cast(list, provider):
provider_type = provider[0].lower()
check.resource_types.append(f"provider.{provider_type}")

else:
check.resource_types = resource_type

Expand Down Expand Up @@ -251,6 +298,11 @@ def get_check_solver(self, check: BaseGraphCheck) -> BaseSolver:
return solver


class NXGraphCheckParser(GraphCheckParser):
# TODO: delete after downstream adjustments
pass


def get_complex_operator(raw_check: Dict[str, Any]) -> Optional[str]:
for operator in operators_to_complex_solver_classes.keys():
if raw_check.get(operator):
Expand Down

0 comments on commit b62d3c4

Please sign in to comment.