Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve various Terraform checks #5658

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
from typing import List
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck


class LambdaEnvironmentEncryptionSettings(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Check encryption settings for Lambda environmental variable"
id = "CKV_AWS_173"
supported_resources = ['AWS::Lambda::Function', "AWS::Serverless::Function"]
categories = [CheckCategories.ENCRYPTION]
supported_resources = ("AWS::Lambda::Function", "AWS::Serverless::Function")
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
properties = conf.get('Properties')
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
properties = conf.get("Properties")
if properties is not None:
env = properties.get('Environment')
env = properties.get("Environment")
if env is not None:
if not isinstance(env, dict):
return CheckResult.UNKNOWN
elif env.get('Variables') and not properties.get('KmsKeyArn'):
elif env.get("Variables") and not properties.get("KmsKeyArn"):
return CheckResult.FAILED
return CheckResult.PASSED

def get_evaluated_keys(self) -> List[str]:
return ['Properties/Environment/Variables', 'Properties/KmsKeyArn']
def get_evaluated_keys(self) -> list[str]:
return ["Properties/KmsKeyArn"]


check = LambdaEnvironmentEncryptionSettings()
14 changes: 8 additions & 6 deletions checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class EKSPublicAccessCIDR(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure Amazon EKS public endpoint not accessible to 0.0.0.0/0"
id = "CKV_AWS_38"
supported_resources = ['aws_eks_cluster']
categories = [CheckCategories.KUBERNETES]
supported_resources = ('aws_eks_cluster',)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
"""
Looks for public_access_cidrs at aws_eks_cluster:
https://www.terraform.io/docs/providers/aws/r/eks_cluster.html
:param conf: aws_eks_cluster configuration
:return: <CheckResult>
"""
self.evaluated_keys = ['vpc_config']
SteveVaknin marked this conversation as resolved.
Show resolved Hide resolved
if "vpc_config" in conf.keys():
if "endpoint_public_access" in conf["vpc_config"][0] and not conf["vpc_config"][0]["endpoint_public_access"][0]:
self.evaluated_keys = ['vpc_config/[0]/endpoint_public_access']
return CheckResult.PASSED
elif "public_access_cidrs" in conf["vpc_config"][0]:
self.evaluated_keys = ['vpc_config/[0]/public_access_cidrs']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
# check that if I have env vars I have a KMS key
if len(conf.get("environment", [])):
if "kms_key_arn" in conf:
if conf["kms_key_arn"] == [""]:
self.evaluated_keys = ["environment/kms_key_arn"]
if conf.get("kms_key_arn") == [""]:
return CheckResult.FAILED
return CheckResult.PASSED
self.evaluated_keys = ["environment"]
return CheckResult.FAILED

# no env vars so should be no key as that causes state mismatch
if "kms_key_arn" in conf:
if not len(conf["kms_key_arn"]):
return CheckResult.PASSED
if "kms_key_arn" in conf and len(conf["kms_key_arn"]):
return CheckResult.FAILED
# neither env vars nor kms key
return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> list[str]:
return ["environment/[0]/variables"]
return ["kms_key_arn"]


check = LambdaEnvironmentEncryptionSettings()
6 changes: 3 additions & 3 deletions checkov/terraform/checks/resource/aws/SNSTopicEncryption.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.common.models.consts import ANY_VALUE
Expand All @@ -16,8 +16,8 @@ def __init__(self) -> None:
def get_inspected_key(self) -> str:
return "kms_master_key_id"

def get_expected_values(self) -> List[Any]:
return [ANY_VALUE]
def get_expected_value(self) -> Any:
return ANY_VALUE


check = SNSTopicEncryption()
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceCheck
from typing import List
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck

class MariaDBPublicAccessDisabled(BaseResourceCheck):
def __init__(self):

class MariaDBPublicAccessDisabled(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure 'public network access enabled' is set to 'False' for MariaDB servers"
id = "CKV_AZURE_48"
supported_resources = ['azurerm_mariadb_server']
categories = [CheckCategories.NETWORKING]
supported_resources = ("azurerm_mariadb_server",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
# Whether or not public network access is allowed for this server. Defaults to true. Which is not optimal
if 'public_network_access_enabled' not in conf or conf['public_network_access_enabled'][0]:
return CheckResult.FAILED
return CheckResult.PASSED
def get_inspected_key(self) -> str:
return "public_network_access_enabled"

def get_evaluated_keys(self) -> List[str]:
return ['public_network_access_enabled']
def get_expected_value(self) -> Any:
return False


check = MariaDBPublicAccessDisabled()
32 changes: 12 additions & 20 deletions checkov/terraform/checks/resource/gcp/GKEClusterLogging.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
from typing import List
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck

class GKEClusterLogging(BaseResourceCheck):
def __init__(self):

class GKEClusterLogging(BaseResourceNegativeValueCheck):
def __init__(self) -> None:
name = "Ensure Stackdriver Logging is set to Enabled on Kubernetes Engine Clusters"
id = "CKV_GCP_1"
supported_resources = ['google_container_cluster']
categories = [CheckCategories.KUBERNETES]
supported_resources = ("google_container_cluster",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
"""
Looks for password configuration at azure_instance:
https://www.terraform.io/docs/providers/google/r/compute_ssl_policy.html
:param conf: google_compute_ssl_policy configuration
:return: <CheckResult>
"""
if 'logging_service' in conf.keys():
if conf['logging_service'][0] == "none":
return CheckResult.FAILED
return CheckResult.PASSED
def get_inspected_key(self) -> str:
return "logging_service"

def get_evaluated_keys(self) -> List[str]:
return ['logging_service']
def get_forbidden_values(self) -> Any:
return "none"


check = GKEClusterLogging()