Skip to content

Commit

Permalink
feat(graph): add experimental debug output for graph check evaluation (
Browse files Browse the repository at this point in the history
…#5257)

* add debug output for graph check evaluation

* fix linting

* add tests
  • Loading branch information
gruebel committed Jun 28, 2023
1 parent 95385f7 commit 6b7c626
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from igraph import Graph
from bc_jsonpath_ng.ext import parse

from checkov.common.graph.checks_infra import debug
from checkov.common.graph.checks_infra.enums import SolverType
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver

Expand Down Expand Up @@ -106,21 +107,39 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
if not resource_variable_dependant or resource_variable_dependant and policy_variable_dependant:
filtered_attribute_matches.append(attribute)
if attribute_matches:
if self.is_jsonpath_check:
if self.resource_type_pred(vertex, self.resource_types) and all(
self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches):
return True if len(attribute_matches) == len(filtered_attribute_matches) else None
return False

if self.resource_type_pred(vertex, self.resource_types) and any(
self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches):
return True
return False if len(attribute_matches) == len(filtered_attribute_matches) else None

return self.resource_type_pred(vertex, self.resource_types) and self._get_operation(
result = self._evaluate_attribute_matches(
vertex=vertex,
attribute_matches=attribute_matches,
filtered_attribute_matches=filtered_attribute_matches,
)
if result is not None:
# skip unknown
debug.attribute_block(
resource_types=self.resource_types,
attribute=self.attribute,
operator=self.operator,
value=self.value,
resource=vertex,
status="passed" if result is True else "failed",
)

return result

result = self.resource_type_pred(vertex, self.resource_types) and self._get_operation(
vertex=vertex, attribute=self.attribute
)

debug.attribute_block(
resource_types=self.resource_types,
attribute=self.attribute,
operator=self.operator,
value=self.value,
resource=vertex,
status="passed" if result is True else "failed",
)

return result

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
raise NotImplementedError

Expand All @@ -139,6 +158,22 @@ def _process_node(
else:
failed_vertices.append(data)

def _evaluate_attribute_matches(
self, vertex: dict[str, Any], attribute_matches: list[str], filtered_attribute_matches: list[str]
) -> bool | None:
if self.is_jsonpath_check:
if self.resource_type_pred(vertex, self.resource_types) and all(
self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches
):
return True if len(attribute_matches) == len(filtered_attribute_matches) else None
return False

if self.resource_type_pred(vertex, self.resource_types) and any(
self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches
):
return True
return False if len(attribute_matches) == len(filtered_attribute_matches) else None

def get_attribute_matches(self, vertex: Dict[str, Any]) -> List[str]:
try:
attribute_matches: List[str] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from igraph import Graph

from checkov.common.graph.checks_infra import debug
from checkov.common.graph.checks_infra.enums import SolverType
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver

Expand Down Expand Up @@ -49,6 +50,14 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List
passed_vertices.append(data)
else:
failed_vertices.append(data)

debug.complex_connection_block(
solvers=self.solvers,
operator=self.operator,
passed_resources=passed_vertices,
failed_resources=failed_vertices,
)

return passed_vertices, failed_vertices, unknown_vertices

for _, data in graph_connector.nodes(data=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING

from checkov.common.graph.checks_infra import debug
from checkov.common.graph.checks_infra.enums import SolverType
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
from checkov.common.checks_infra.solvers.attribute_solvers.base_attribute_solver import BaseAttributeSolver
Expand Down Expand Up @@ -52,6 +53,9 @@ def filter_results(
passed = self.filter_duplicates(passed)
failed = self.filter_duplicates(failed)
unknown = self.filter_duplicates(unknown)

debug.complex_connection_block(solvers=self.solvers, operator=self.operator, passed_resources=passed, failed_resources=failed)

return passed, failed, unknown

def get_sorted_connection_solvers(self) -> List[BaseConnectionSolver]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from igraph import Graph

from checkov.common.graph.checks_infra import debug

try:
from networkx import edge_dfs
except ImportError:
Expand Down Expand Up @@ -36,6 +38,21 @@ def __init__(

def get_operation(
self, graph_connector: LibraryGraph
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
passed, failed, unknown = self._get_operation(graph_connector=graph_connector)

debug.connection_block(
resource_types=self.resource_types,
connected_resource_types=self.connected_resources_types,
operator=self.operator,
passed_resources=passed,
failed_resources=failed,
)

return passed, failed, unknown

def _get_operation(
self, graph_connector: LibraryGraph
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
passed: List[Dict[str, Any]] = []
failed: List[Dict[str, Any]] = []
Expand Down Expand Up @@ -68,9 +85,13 @@ def get_operation(

if origin_attributes and destination_attributes_list:
for destination_attributes in destination_attributes_list:
self.populate_checks_results(origin_attributes=origin_attributes,
destination_attributes=destination_attributes, passed=passed,
failed=failed, unknown=unknown)
self.populate_checks_results(
origin_attributes=origin_attributes,
destination_attributes=destination_attributes,
passed=passed,
failed=failed,
unknown=unknown,
)
else:
for u, v in edge_dfs(graph_connector):
origin_attributes = graph_connector.nodes(data=True)[u]
Expand All @@ -84,9 +105,13 @@ def get_operation(

destination_attributes = graph_connector.nodes(data=True)[v]
if destination_attributes in opposite_vertices:
self.populate_checks_results(origin_attributes=origin_attributes,
destination_attributes=destination_attributes, passed=passed,
failed=failed, unknown=unknown)
self.populate_checks_results(
origin_attributes=origin_attributes,
destination_attributes=destination_attributes,
passed=passed,
failed=failed,
unknown=unknown,
)
destination_attributes["connected_node"] = origin_attributes
continue

Expand All @@ -98,7 +123,7 @@ def get_operation(
output_destination = graph_connector.nodes(data=True)[output_destination]
output_destination_type = output_destination.get(CustomAttributes.RESOURCE_TYPE)
if self.is_associated_edge(
origin_attributes.get(CustomAttributes.RESOURCE_TYPE), output_destination_type
origin_attributes.get(CustomAttributes.RESOURCE_TYPE), output_destination_type
):
passed.extend([origin_attributes, output_destination])
except StopIteration:
Expand All @@ -113,4 +138,5 @@ def get_operation(
if v not in itertools.chain(passed, unknown)
]
)

return passed, failed, unknown
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING

from checkov.common.graph.checks_infra import debug
from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.checks_infra.solvers.connections_solvers.connection_exists_solver import ConnectionExistsSolver

Expand All @@ -28,5 +29,14 @@ def __init__(

def get_operation(self, graph_connector: LibraryGraph) -> \
Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
passed, failed, unknown = super().get_operation(graph_connector)
passed, failed, unknown = super()._get_operation(graph_connector)

debug.connection_block(
resource_types=self.resource_types,
connected_resource_types=self.connected_resources_types,
operator=self.operator,
passed_resources=failed, # it has to be switched here, like the output
failed_resources=passed,
)

return failed, passed, unknown
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING
from checkov.common.checks_infra.solvers import ConnectionExistsSolver
from checkov.common.graph.checks_infra import debug
from checkov.common.graph.checks_infra.enums import Operators

if TYPE_CHECKING:
Expand All @@ -27,7 +28,16 @@ def __init__(

def get_operation(self, graph_connector: LibraryGraph) -> \
Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
passed, failed, unknown = super().get_operation(graph_connector)
passed, failed, unknown = super()._get_operation(graph_connector)
failed = [f for f in failed if f not in passed]
unknown = [u for u in unknown if u not in passed]

debug.connection_block(
resource_types=self.resource_types,
connected_resource_types=self.connected_resources_types,
operator=self.operator,
passed_resources=passed,
failed_resources=failed,
)

return passed, failed, unknown
Loading

0 comments on commit 6b7c626

Please sign in to comment.