Skip to content

Commit

Permalink
chore: improve various Terraform checks (#5658)
Browse files Browse the repository at this point in the history
* improve various Terraform chceks

* fix linting
  • Loading branch information
gruebel committed Oct 18, 2023
1 parent f841619 commit 024b2b2
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
from typing import List
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck


class LambdaEnvironmentEncryptionSettings(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Check encryption settings for Lambda environmental variable"
id = "CKV_AWS_173"
supported_resources = ['AWS::Lambda::Function', "AWS::Serverless::Function"]
categories = [CheckCategories.ENCRYPTION]
supported_resources = ("AWS::Lambda::Function", "AWS::Serverless::Function")
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
properties = conf.get('Properties')
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
properties = conf.get("Properties")
if properties is not None:
env = properties.get('Environment')
env = properties.get("Environment")
if env is not None:
if not isinstance(env, dict):
return CheckResult.UNKNOWN
elif env.get('Variables') and not properties.get('KmsKeyArn'):
elif env.get("Variables") and not properties.get("KmsKeyArn"):
return CheckResult.FAILED
return CheckResult.PASSED

def get_evaluated_keys(self) -> List[str]:
return ['Properties/Environment/Variables', 'Properties/KmsKeyArn']
def get_evaluated_keys(self) -> list[str]:
return ["Properties/KmsKeyArn"]


check = LambdaEnvironmentEncryptionSettings()
14 changes: 8 additions & 6 deletions checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class EKSPublicAccessCIDR(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure Amazon EKS public endpoint not accessible to 0.0.0.0/0"
id = "CKV_AWS_38"
supported_resources = ['aws_eks_cluster']
categories = [CheckCategories.KUBERNETES]
supported_resources = ('aws_eks_cluster',)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
"""
Looks for public_access_cidrs at aws_eks_cluster:
https://www.terraform.io/docs/providers/aws/r/eks_cluster.html
:param conf: aws_eks_cluster configuration
:return: <CheckResult>
"""
self.evaluated_keys = ['vpc_config']
if "vpc_config" in conf.keys():
if "endpoint_public_access" in conf["vpc_config"][0] and not conf["vpc_config"][0]["endpoint_public_access"][0]:
self.evaluated_keys = ['vpc_config/[0]/endpoint_public_access']
return CheckResult.PASSED
elif "public_access_cidrs" in conf["vpc_config"][0]:
self.evaluated_keys = ['vpc_config/[0]/public_access_cidrs']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
# check that if I have env vars I have a KMS key
if len(conf.get("environment", [])):
if "kms_key_arn" in conf:
if conf["kms_key_arn"] == [""]:
self.evaluated_keys = ["environment/kms_key_arn"]
if conf.get("kms_key_arn") == [""]:
return CheckResult.FAILED
return CheckResult.PASSED
self.evaluated_keys = ["environment"]
return CheckResult.FAILED

# no env vars so should be no key as that causes state mismatch
if "kms_key_arn" in conf:
if not len(conf["kms_key_arn"]):
return CheckResult.PASSED
if "kms_key_arn" in conf and len(conf["kms_key_arn"]):
return CheckResult.FAILED
# neither env vars nor kms key
return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> list[str]:
return ["environment/[0]/variables"]
return ["kms_key_arn"]


check = LambdaEnvironmentEncryptionSettings()
6 changes: 3 additions & 3 deletions checkov/terraform/checks/resource/aws/SNSTopicEncryption.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.common.models.consts import ANY_VALUE
Expand All @@ -16,8 +16,8 @@ def __init__(self) -> None:
def get_inspected_key(self) -> str:
return "kms_master_key_id"

def get_expected_values(self) -> List[Any]:
return [ANY_VALUE]
def get_expected_value(self) -> Any:
return ANY_VALUE


check = SNSTopicEncryption()
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceCheck
from typing import List
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck

class MariaDBPublicAccessDisabled(BaseResourceCheck):
def __init__(self):

class MariaDBPublicAccessDisabled(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure 'public network access enabled' is set to 'False' for MariaDB servers"
id = "CKV_AZURE_48"
supported_resources = ['azurerm_mariadb_server']
categories = [CheckCategories.NETWORKING]
supported_resources = ("azurerm_mariadb_server",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
# Whether or not public network access is allowed for this server. Defaults to true. Which is not optimal
if 'public_network_access_enabled' not in conf or conf['public_network_access_enabled'][0]:
return CheckResult.FAILED
return CheckResult.PASSED
def get_inspected_key(self) -> str:
return "public_network_access_enabled"

def get_evaluated_keys(self) -> List[str]:
return ['public_network_access_enabled']
def get_expected_value(self) -> Any:
return False


check = MariaDBPublicAccessDisabled()
32 changes: 12 additions & 20 deletions checkov/terraform/checks/resource/gcp/GKEClusterLogging.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
from typing import List
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck

class GKEClusterLogging(BaseResourceCheck):
def __init__(self):

class GKEClusterLogging(BaseResourceNegativeValueCheck):
def __init__(self) -> None:
name = "Ensure Stackdriver Logging is set to Enabled on Kubernetes Engine Clusters"
id = "CKV_GCP_1"
supported_resources = ['google_container_cluster']
categories = [CheckCategories.KUBERNETES]
supported_resources = ("google_container_cluster",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
"""
Looks for password configuration at azure_instance:
https://www.terraform.io/docs/providers/google/r/compute_ssl_policy.html
:param conf: google_compute_ssl_policy configuration
:return: <CheckResult>
"""
if 'logging_service' in conf.keys():
if conf['logging_service'][0] == "none":
return CheckResult.FAILED
return CheckResult.PASSED
def get_inspected_key(self) -> str:
return "logging_service"

def get_evaluated_keys(self) -> List[str]:
return ['logging_service']
def get_forbidden_values(self) -> Any:
return "none"


check = GKEClusterLogging()

0 comments on commit 024b2b2

Please sign in to comment.