diff --git a/checkov/terraform/checks/resource/azure/AKSEphemeralOSDisks.py b/checkov/terraform/checks/resource/azure/AKSEphemeralOSDisks.py new file mode 100644 index 00000000000..be8642d5aa4 --- /dev/null +++ b/checkov/terraform/checks/resource/azure/AKSEphemeralOSDisks.py @@ -0,0 +1,34 @@ +from checkov.common.models.enums import CheckCategories +from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck +from typing import Any + + +class AKSEphemeralOSDisks(BaseResourceValueCheck): + def __init__(self) -> None: + """ + Temporary data can contain sensitive data at some points, by using ephemeral disks, + we ensure that data written to OS disk is stored on local VM storage and isn't persisted to Azure Storage + + Azure automatically replicates data stored in the managed OS disk of a virtual machine to Azure storage + to avoid data loss in case the virtual machine needs to be relocated to another host. + Generally speaking, containers are not designed to have local state persisted to the managed OS disk, + hence this behavior offers limited value to AKS hosted while providing some drawbacks, + including slower node provisioning and higher read/write latency. + + Ephemeral disks allow us also to have faster cluster operations like scale or upgrade + due to faster re-imaging and boot times. + """ + name = "Ensure ephemeral disks are used for OS disks" + id = "CKV_AZURE_226" + supported_resources = ("azurerm_kubernetes_cluster",) + categories = (CheckCategories.KUBERNETES,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def get_inspected_key(self) -> str: + return "os_disk_type" + + def get_expected_value(self) -> Any: + return "Ephemeral" + + +check = AKSEphemeralOSDisks() diff --git a/tests/terraform/checks/resource/azure/example_AKSEphemeralOSDisks/main.tf b/tests/terraform/checks/resource/azure/example_AKSEphemeralOSDisks/main.tf new file mode 100644 index 00000000000..d3fb6ca0a9b --- /dev/null +++ b/tests/terraform/checks/resource/azure/example_AKSEphemeralOSDisks/main.tf @@ -0,0 +1,34 @@ +resource "azurerm_kubernetes_cluster" "pass" { + name = "internal" + kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id + vm_size = "Standard_DS2_v2" + node_count = 1 + os_disk_type = "Ephemeral" + + tags = { + Environment = "Production" + } +} + +resource "azurerm_kubernetes_cluster" "fail" { + name = "internal" + kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id + vm_size = "Standard_DS2_v2" + node_count = 1 + + tags = { + Environment = "Production" + } +} + +resource "azurerm_kubernetes_cluster" "fail2" { + name = "internal" + kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id + vm_size = "Standard_DS2_v2" + node_count = 1 + os_disk_type = "Managed" + + tags = { + Environment = "Production" + } +} \ No newline at end of file diff --git a/tests/terraform/checks/resource/azure/test_AKSEphemeralOSDisks.py b/tests/terraform/checks/resource/azure/test_AKSEphemeralOSDisks.py new file mode 100644 index 00000000000..68c96026b2e --- /dev/null +++ b/tests/terraform/checks/resource/azure/test_AKSEphemeralOSDisks.py @@ -0,0 +1,42 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner +from checkov.terraform.checks.resource.azure.AKSEphemeralOSDisks import check + + +class AKSEphemeralOSDisks(unittest.TestCase): + + def test(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = os.path.join(current_dir, "example_AKSEphemeralOSDisks") + report = runner.run(root_folder=test_files_dir, + runner_filter=RunnerFilter(checks=[check.id])) + summary = report.get_summary() + + passing_resources = { + 'azurerm_kubernetes_cluster.pass', + } + failing_resources = { + 'azurerm_kubernetes_cluster.fail', + 'azurerm_kubernetes_cluster.fail2', + } + skipped_resources = {} + + passed_check_resources = set([c.resource for c in report.passed_checks]) + failed_check_resources = set([c.resource for c in report.failed_checks]) + + self.assertEqual(summary['passed'], len(passing_resources)) + self.assertEqual(summary['failed'], len(failing_resources)) + self.assertEqual(summary['skipped'], len(skipped_resources)) + self.assertEqual(summary['parsing_errors'], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == '__main__': + unittest.main()