Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): Ensure ephemeral disks are used for OS disks #5584

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions checkov/terraform/checks/resource/azure/AKSEphemeralOSDisks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from typing import Any


class AKSEphemeralOSDisks(BaseResourceValueCheck):
def __init__(self) -> None:
"""
Temporary data can contain sensitive data at some points, by using ephemeral disks,
we ensure that data written to OS disk is stored on local VM storage and isn't persisted to Azure Storage

Azure automatically replicates data stored in the managed OS disk of a virtual machine to Azure storage
to avoid data loss in case the virtual machine needs to be relocated to another host.
Generally speaking, containers are not designed to have local state persisted to the managed OS disk,
hence this behavior offers limited value to AKS hosted while providing some drawbacks,
including slower node provisioning and higher read/write latency.

Ephemeral disks allow us also to have faster cluster operations like scale or upgrade
due to faster re-imaging and boot times.
"""
name = "Ensure ephemeral disks are used for OS disks"
id = "CKV_AZURE_226"
supported_resources = ("azurerm_kubernetes_cluster",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "os_disk_type"

def get_expected_value(self) -> Any:
return "Ephemeral"


check = AKSEphemeralOSDisks()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
resource "azurerm_kubernetes_cluster" "pass" {
name = "internal"
kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id
vm_size = "Standard_DS2_v2"
node_count = 1
os_disk_type = "Ephemeral"

tags = {
Environment = "Production"
}
}

resource "azurerm_kubernetes_cluster" "fail" {
name = "internal"
kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id
vm_size = "Standard_DS2_v2"
node_count = 1

tags = {
Environment = "Production"
}
}

resource "azurerm_kubernetes_cluster" "fail2" {
name = "internal"
kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id
vm_size = "Standard_DS2_v2"
node_count = 1
os_disk_type = "Managed"

tags = {
Environment = "Production"
}
}
42 changes: 42 additions & 0 deletions tests/terraform/checks/resource/azure/test_AKSEphemeralOSDisks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner
from checkov.terraform.checks.resource.azure.AKSEphemeralOSDisks import check


class AKSEphemeralOSDisks(unittest.TestCase):

def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = os.path.join(current_dir, "example_AKSEphemeralOSDisks")
report = runner.run(root_folder=test_files_dir,
runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
'azurerm_kubernetes_cluster.pass',
}
failing_resources = {
'azurerm_kubernetes_cluster.fail',
'azurerm_kubernetes_cluster.fail2',
}
skipped_resources = {}

passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary['passed'], len(passing_resources))
self.assertEqual(summary['failed'], len(failing_resources))
self.assertEqual(summary['skipped'], len(skipped_resources))
self.assertEqual(summary['parsing_errors'], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
Loading