From beb601ebf390ac3cc363a0ac0a75de4055add468 Mon Sep 17 00:00:00 2001 From: gruebel Date: Wed, 4 Oct 2023 22:07:26 +0200 Subject: [PATCH] adjust check logic --- .../VMScaleSetsAutoOSImagePatchingEnabled.py | 56 +++++++++---------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/checkov/arm/checks/resource/VMScaleSetsAutoOSImagePatchingEnabled.py b/checkov/arm/checks/resource/VMScaleSetsAutoOSImagePatchingEnabled.py index a8672e855cd..53e35feb4bc 100644 --- a/checkov/arm/checks/resource/VMScaleSetsAutoOSImagePatchingEnabled.py +++ b/checkov/arm/checks/resource/VMScaleSetsAutoOSImagePatchingEnabled.py @@ -1,43 +1,41 @@ +from __future__ import annotations + +from typing import Any + from checkov.common.models.enums import CheckCategories, CheckResult from checkov.arm.base_resource_check import BaseResourceCheck +from checkov.common.util.data_structures_utils import find_in_dict class VMScaleSetsAutoOSImagePatchingEnabled(BaseResourceCheck): def __init__(self) -> None: name = "Ensure that automatic OS image patching is enabled for Virtual Machine Scale Sets" id = "CKV_AZURE_95" - supported_resources = ['Microsoft.Compute/virtualMachineScaleSets'] - categories = [CheckCategories.GENERAL_SECURITY] + supported_resources = ("Microsoft.Compute/virtualMachineScaleSets",) + categories = (CheckCategories.GENERAL_SECURITY,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf) -> CheckResult: - if conf.get("properties") and isinstance(conf.get("properties"), dict): - properties = conf.get("properties") - self.evaluated_keys = ['properties'] - - if properties.get("orchestrationMode") and isinstance(properties.get("orchestrationMode"), str): - if properties.get("orchestrationMode") == "Flexible": - self.evaluated_keys = ['properties/orchestrationMode'] - return CheckResult.FAILED - - if properties.get("virtualMachineProfile") and isinstance(properties.get("virtualMachineProfile"), dict): - virtualMachineProfile = properties.get("virtualMachineProfile") - self.evaluated_keys = ['properties/virtualMachineProfile'] - - if virtualMachineProfile.get("extensionProfile") and isinstance(virtualMachineProfile.get("extensionProfile"), dict): - extensionProfile = virtualMachineProfile.get("extensionProfile") - self.evaluated_keys = ['properties/virtualMachineProfile/extensionProfile'] - - if extensionProfile.get("extensions") and isinstance(extensionProfile.get("extensions"), list): - extensions = extensionProfile.get("extensions") - self.evaluated_keys = ['properties/virtualMachineProfile/extensionProfile/extensions'] - - for extension in extensions: - if extension.get("properties") and isinstance(extension.get("properties"), dict): - properties = extension.get("properties") - if properties.get("enableAutomaticUpgrade") is True and isinstance(properties.get("autoUpgradeMinorVersion"), bool): - return CheckResult.PASSED + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + properties = conf.get("properties") + if properties and isinstance(properties, dict): + if properties.get("orchestrationMode") == "Flexible": + self.evaluated_keys = ["properties/orchestrationMode"] + return CheckResult.FAILED + + self.evaluated_keys = ["properties/virtualMachineProfile/extensionProfile/extensions"] + extensions = find_in_dict( + input_dict=properties, + key_path="virtualMachineProfile/extensionProfile/extensions", + ) + if extensions: + for extension in extensions: + extension_properties = extension.get("properties") + if extension_properties and isinstance(extension_properties, dict): + if extension_properties.get("enableAutomaticUpgrade") is True: + return CheckResult.PASSED + return CheckResult.FAILED + return CheckResult.UNKNOWN