Skip to content

Commit

Permalink
feat(terraform): Ensure ephemeral disks are used for OS disks (#5584)
Browse files Browse the repository at this point in the history
Created check

Co-authored-by: Thomas Defise <[email protected]>
  • Loading branch information
tdefise and Thomas Defise committed Sep 29, 2023
1 parent a346c92 commit 9b06730
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
34 changes: 34 additions & 0 deletions checkov/terraform/checks/resource/azure/AKSEphemeralOSDisks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from typing import Any


class AKSEphemeralOSDisks(BaseResourceValueCheck):
def __init__(self) -> None:
"""
Temporary data can contain sensitive data at some points, by using ephemeral disks,
we ensure that data written to OS disk is stored on local VM storage and isn't persisted to Azure Storage
Azure automatically replicates data stored in the managed OS disk of a virtual machine to Azure storage
to avoid data loss in case the virtual machine needs to be relocated to another host.
Generally speaking, containers are not designed to have local state persisted to the managed OS disk,
hence this behavior offers limited value to AKS hosted while providing some drawbacks,
including slower node provisioning and higher read/write latency.
Ephemeral disks allow us also to have faster cluster operations like scale or upgrade
due to faster re-imaging and boot times.
"""
name = "Ensure ephemeral disks are used for OS disks"
id = "CKV_AZURE_226"
supported_resources = ("azurerm_kubernetes_cluster",)
categories = (CheckCategories.KUBERNETES,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "os_disk_type"

def get_expected_value(self) -> Any:
return "Ephemeral"


check = AKSEphemeralOSDisks()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
resource "azurerm_kubernetes_cluster" "pass" {
name = "internal"
kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id
vm_size = "Standard_DS2_v2"
node_count = 1
os_disk_type = "Ephemeral"

tags = {
Environment = "Production"
}
}

resource "azurerm_kubernetes_cluster" "fail" {
name = "internal"
kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id
vm_size = "Standard_DS2_v2"
node_count = 1

tags = {
Environment = "Production"
}
}

resource "azurerm_kubernetes_cluster" "fail2" {
name = "internal"
kubernetes_cluster_id = azurerm_kubernetes_cluster.example.id
vm_size = "Standard_DS2_v2"
node_count = 1
os_disk_type = "Managed"

tags = {
Environment = "Production"
}
}
42 changes: 42 additions & 0 deletions tests/terraform/checks/resource/azure/test_AKSEphemeralOSDisks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner
from checkov.terraform.checks.resource.azure.AKSEphemeralOSDisks import check


class AKSEphemeralOSDisks(unittest.TestCase):

def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = os.path.join(current_dir, "example_AKSEphemeralOSDisks")
report = runner.run(root_folder=test_files_dir,
runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
'azurerm_kubernetes_cluster.pass',
}
failing_resources = {
'azurerm_kubernetes_cluster.fail',
'azurerm_kubernetes_cluster.fail2',
}
skipped_resources = {}

passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary['passed'], len(passing_resources))
self.assertEqual(summary['failed'], len(failing_resources))
self.assertEqual(summary['skipped'], len(skipped_resources))
self.assertEqual(summary['parsing_errors'], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()

0 comments on commit 9b06730

Please sign in to comment.