Skip to content

Commit

Permalink
Merge branch 'main' into itai_fix_wrong_abs_path
Browse files Browse the repository at this point in the history
  • Loading branch information
ipeleg committed Oct 18, 2023
2 parents db4aeeb + 02528c3 commit 11533e9
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
from typing import List
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck


class LambdaEnvironmentEncryptionSettings(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Check encryption settings for Lambda environmental variable"
id = "CKV_AWS_173"
supported_resources = ['AWS::Lambda::Function', "AWS::Serverless::Function"]
categories = [CheckCategories.ENCRYPTION]
supported_resources = ("AWS::Lambda::Function", "AWS::Serverless::Function")
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
properties = conf.get('Properties')
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
properties = conf.get("Properties")
if properties is not None:
env = properties.get('Environment')
env = properties.get("Environment")
if env is not None:
if not isinstance(env, dict):
return CheckResult.UNKNOWN
elif env.get('Variables') and not properties.get('KmsKeyArn'):
elif env.get("Variables") and not properties.get("KmsKeyArn"):
return CheckResult.FAILED
return CheckResult.PASSED

def get_evaluated_keys(self) -> List[str]:
return ['Properties/Environment/Variables', 'Properties/KmsKeyArn']
def get_evaluated_keys(self) -> list[str]:
return ["Properties/KmsKeyArn"]


check = LambdaEnvironmentEncryptionSettings()
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_inspected_key(self) -> str:
return "version"

def get_expected_values(self) -> list[Any]:
return ["1.23", "1.24", "1.25", "1.26", "1.27"]
return ["1.23", "1.24", "1.25", "1.26", "1.27", "1.28"]


check = EKSPlatformVersion()
14 changes: 8 additions & 6 deletions checkov/terraform/checks/resource/aws/EKSPublicAccessCIDR.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class EKSPublicAccessCIDR(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure Amazon EKS public endpoint not accessible to 0.0.0.0/0"
id = "CKV_AWS_38"
supported_resources = ['aws_eks_cluster']
categories = [CheckCategories.KUBERNETES]
supported_resources = ('aws_eks_cluster',)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
"""
Looks for public_access_cidrs at aws_eks_cluster:
https://www.terraform.io/docs/providers/aws/r/eks_cluster.html
:param conf: aws_eks_cluster configuration
:return: <CheckResult>
"""
self.evaluated_keys = ['vpc_config']
if "vpc_config" in conf.keys():
if "endpoint_public_access" in conf["vpc_config"][0] and not conf["vpc_config"][0]["endpoint_public_access"][0]:
self.evaluated_keys = ['vpc_config/[0]/endpoint_public_access']
return CheckResult.PASSED
elif "public_access_cidrs" in conf["vpc_config"][0]:
self.evaluated_keys = ['vpc_config/[0]/public_access_cidrs']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
# check that if I have env vars I have a KMS key
if len(conf.get("environment", [])):
if "kms_key_arn" in conf:
if conf["kms_key_arn"] == [""]:
self.evaluated_keys = ["environment/kms_key_arn"]
if conf.get("kms_key_arn") == [""]:
return CheckResult.FAILED
return CheckResult.PASSED
self.evaluated_keys = ["environment"]
return CheckResult.FAILED

# no env vars so should be no key as that causes state mismatch
if "kms_key_arn" in conf:
if not len(conf["kms_key_arn"]):
return CheckResult.PASSED
if "kms_key_arn" in conf and len(conf["kms_key_arn"]):
return CheckResult.FAILED
# neither env vars nor kms key
return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> list[str]:
return ["environment/[0]/variables"]
return ["kms_key_arn"]


check = LambdaEnvironmentEncryptionSettings()
6 changes: 3 additions & 3 deletions checkov/terraform/checks/resource/aws/SNSTopicEncryption.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.common.models.consts import ANY_VALUE
Expand All @@ -16,8 +16,8 @@ def __init__(self) -> None:
def get_inspected_key(self) -> str:
return "kms_master_key_id"

def get_expected_values(self) -> List[Any]:
return [ANY_VALUE]
def get_expected_value(self) -> Any:
return ANY_VALUE


check = SNSTopicEncryption()
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceCheck
from typing import List
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck

class MariaDBPublicAccessDisabled(BaseResourceCheck):
def __init__(self):

class MariaDBPublicAccessDisabled(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure 'public network access enabled' is set to 'False' for MariaDB servers"
id = "CKV_AZURE_48"
supported_resources = ['azurerm_mariadb_server']
categories = [CheckCategories.NETWORKING]
supported_resources = ("azurerm_mariadb_server",)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
# Whether or not public network access is allowed for this server. Defaults to true. Which is not optimal
if 'public_network_access_enabled' not in conf or conf['public_network_access_enabled'][0]:
return CheckResult.FAILED
return CheckResult.PASSED
def get_inspected_key(self) -> str:
return "public_network_access_enabled"

def get_evaluated_keys(self) -> List[str]:
return ['public_network_access_enabled']
def get_expected_value(self) -> Any:
return False


check = MariaDBPublicAccessDisabled()
32 changes: 12 additions & 20 deletions checkov/terraform/checks/resource/gcp/GKEClusterLogging.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
from typing import List
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck

class GKEClusterLogging(BaseResourceCheck):
def __init__(self):

class GKEClusterLogging(BaseResourceNegativeValueCheck):
def __init__(self) -> None:
name = "Ensure Stackdriver Logging is set to Enabled on Kubernetes Engine Clusters"
id = "CKV_GCP_1"
supported_resources = ['google_container_cluster']
categories = [CheckCategories.KUBERNETES]
supported_resources = ("google_container_cluster",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
"""
Looks for password configuration at azure_instance:
https://www.terraform.io/docs/providers/google/r/compute_ssl_policy.html
:param conf: google_compute_ssl_policy configuration
:return: <CheckResult>
"""
if 'logging_service' in conf.keys():
if conf['logging_service'][0] == "none":
return CheckResult.FAILED
return CheckResult.PASSED
def get_inspected_key(self) -> str:
return "logging_service"

def get_evaluated_keys(self) -> List[str]:
return ['logging_service']
def get_forbidden_values(self) -> Any:
return "none"


check = GKEClusterLogging()
18 changes: 14 additions & 4 deletions checkov/terraform/parser_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

from checkov.common.util.data_structures_utils import pickle_deepcopy
from checkov.common.util.type_forcers import convert_str_to_bool
from checkov.common.util.parser_utils import eval_string, split_merge_args, string_to_native, to_string
from checkov.common.util.parser_utils import (
eval_string,
split_merge_args,
string_to_native,
to_string,
)

#
# Functions defined in this file implement terraform functions.
Expand Down Expand Up @@ -121,7 +126,7 @@ def toset(original: str, **_: Any) -> set[Any] | str:

def tomap(original: str, **_: Any) -> dict[Hashable, Any] | str:
# https://www.terraform.io/docs/language/functions/tomap.html
original = original.replace(":", "=") # converted to colons by parser #shrug
original = original.replace(":", "=") # converted to colons by parser #shrug

altered_value = eval_string(original)
if altered_value is None or not isinstance(altered_value, dict):
Expand All @@ -136,7 +141,7 @@ def map(original: str, **_: Any) -> dict[Hashable, Any] | str:
# the issue, act like it's a list (to allow comma separation) and let the HCL
# parser deal with it. Then iterating the list is easy.
converted_to_list = eval_string(f"[{original}]")
if converted_to_list is None or len(converted_to_list) & 1: # none or odd number of args
if converted_to_list is None or len(converted_to_list) & 1: # none or odd number of args
return FUNCTION_FAILED

return create_map(converted_to_list)
Expand Down Expand Up @@ -190,8 +195,13 @@ def handle_dynamic_values(conf: Dict[str, List[Any]], has_dynamic_block: bool =


def process_dynamic_values(conf: Dict[str, List[Any]]) -> bool:
dynamic_conf: Union[List[Any], Dict[str, List[Any]]] = conf.get("dynamic", {})

if not isinstance(dynamic_conf, list):
return False

has_dynamic_block = False
for dynamic_element in conf.get("dynamic", {}):
for dynamic_element in dynamic_conf:
if isinstance(dynamic_element, str):
try:
dynamic_element = json.loads(dynamic_element)
Expand Down
2 changes: 1 addition & 1 deletion checkov/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '2.5.11'
version = '2.5.12'
2 changes: 1 addition & 1 deletion kubernetes/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
checkov==2.5.11
checkov==2.5.12

0 comments on commit 11533e9

Please sign in to comment.