Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform_plan): Expose field changes to python checks #5112

Merged
merged 15 commits into from
Jun 6, 2023
Merged
27 changes: 19 additions & 8 deletions checkov/terraform/plan_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SIMPLE_TYPES = (str, int, float, bool)
TF_PLAN_RESOURCE_ADDRESS = "__address__"
TF_PLAN_RESOURCE_CHANGE_ACTIONS = "__change_actions__"
TF_PLAN_RESOURCE_CHANGE_KEYS = "__change_keys__"

RESOURCE_TYPES_JSONIFY = {
"aws_batch_job_definition": "container_properties",
Expand Down Expand Up @@ -129,7 +130,6 @@ def _prepare_resource_block(
:param resource: tf planned_values resource block
:param conf: tf configuration resource block
:param resource_changes: tf resource_changes block

:returns:
- resource_block: a list of strings representing the header columns
- prepared: whether conditions met to prepare data
Expand Down Expand Up @@ -161,6 +161,7 @@ def _prepare_resource_block(
changes = resource_changes.get(resource_address) # type:ignore[arg-type] # becaus eit can be None
if changes:
resource_conf[TF_PLAN_RESOURCE_CHANGE_ACTIONS] = changes.get("change", {}).get("actions") or []
resource_conf[TF_PLAN_RESOURCE_CHANGE_KEYS] = changes["changed_keys"]
tarfeef101 marked this conversation as resolved.
Show resolved Hide resolved

resource_block[resource_type][resource.get("name", "default")] = resource_conf
prepared = True
Expand All @@ -186,7 +187,7 @@ def _find_child_modules(
nested_blocks = _find_child_modules(
child_modules=nested_child_modules,
resource_changes=resource_changes,
root_module_conf=root_module_conf
root_module_conf=root_module_conf,
)
for block_type, resource_blocks in nested_blocks.items():
blocks[block_type].extend(resource_blocks)
Expand Down Expand Up @@ -238,14 +239,24 @@ def _get_resource_changes(template: dict[str, Any]) -> dict[str, dict[str, Any]]
"""Returns a resource address to resource changes dict"""

resource_changes_map = {}

resource_changes = template.get("resource_changes")
if resource_changes and isinstance(resource_changes, list):
resource_changes_map = {
change.get("address", ""): change
for change in resource_changes
}

if resource_changes and isinstance(resource_changes, list):
for each in resource_changes:
tarfeef101 marked this conversation as resolved.
Show resolved Hide resolved
resource_changes_map[each["address"]] = each
changes = []

# before + after are None when resources are created/destroyed, so make them safe
if not each["change"]["before"]:
each["change"]["before"] = {}
if not each["change"]["after"]:
each["change"]["after"] = {}
tarfeef101 marked this conversation as resolved.
Show resolved Hide resolved

for field in each["change"]["before"]:
if each["change"]["before"][field] != each["change"]["after"].get(field):
tarfeef101 marked this conversation as resolved.
Show resolved Hide resolved
changes.append(field)

resource_changes_map[each["address"]]["changed_keys"] = changes
tarfeef101 marked this conversation as resolved.
Show resolved Hide resolved
return resource_changes_map


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_CHANGE_ACTIONS, TF_PLAN_RESOURCE_CHANGE_KEYS


class SecurityGroupRuleProtocolChanged(BaseResourceCheck):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also use this check in a test case to prove you have passing and failing checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk, can I? I am not an appdev so I did a lot of guessing/inferencing to arrive at what I just commited 😅

lmk if I did what I intended to

def __init__(self) -> None:
name = "Ensure security group rule protocol is not being changed"
id = "CUSTOM_CHANGE_1"
supported_resources = ("aws_security_group_rule",)
categories = (CheckCategories.GENERAL_SECURITY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
actions = conf.get(TF_PLAN_RESOURCE_CHANGE_ACTIONS)
if isinstance(actions, list) and "update" in actions:
if "protocol" in conf.get(TF_PLAN_RESOURCE_CHANGE_KEYS):
self.details.append("some great details")
return CheckResult.FAILED
return CheckResult.PASSED


scanner = SecurityGroupRuleProtocolChanged()