Skip to content

Commit

Permalink
feat(terraform): add CKV_AWS_360 to ensure backup retention period on…
Browse files Browse the repository at this point in the history
… AWS Document DB (#5547)

* feature(AWS DocumentDB): add adequate backup retention check

* feature(cloudformation): add adequate backup retention check

* Fix tests
  • Loading branch information
SteveVaknin committed Sep 10, 2023
1 parent b978759 commit 2db2347
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 0 deletions.
25 changes: 25 additions & 0 deletions checkov/cloudformation/checks/resource/aws/DocDBBackupRetention.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.cloudformation.checks.resource.base_resource_value_check import BaseResourceValueCheck


class DocDBBackupRetention(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure DocDB has an adequate backup retention period"
id = "CKV_AWS_360"
supported_resources = ("AWS::DocDB::DBCluster",)
categories = (CheckCategories.BACKUP_AND_RECOVERY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "Properties/BackupRetentionPeriod"

def scan_resource_conf(self, conf) -> CheckResult:
properties = conf.get("Properties")
if properties:
backup_retention_period = properties.get("BackupRetentionPeriod", 1)
if backup_retention_period >= 7:
return CheckResult.PASSED
return CheckResult.FAILED


check = DocDBBackupRetention()
22 changes: 22 additions & 0 deletions checkov/terraform/checks/resource/aws/DocDBBackupRetention.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck


class DocDBBackupRetention(BaseResourceValueCheck):
def __init__(self):
name = "Ensure DocDB has an adequate backup retention period"
id = "CKV_AWS_360"
supported_resources = ['aws_docdb_cluster']
categories = [CheckCategories.BACKUP_AND_RECOVERY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self):
return "backup_retention_period"

def scan_resource_conf(self, conf):
if conf.get("backup_retention_period", [1])[0] >= 7:
return CheckResult.PASSED
return CheckResult.FAILED


check = DocDBBackupRetention()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
AWSTemplateFormatVersion: "2010-09-09"
Resources:
DocDBDefault:
Type: AWS::DocDB::DBCluster
Properties:
MasterUsername: name
MasterUserPassword: password
DocDBNotAdequate:
Type: AWS::DocDB::DBCluster
Properties:
MasterUsername: name
MasterUserPassword: password
BackupRetentionPeriod: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
AWSTemplateFormatVersion: "2010-09-09"
Resources:
DocDBAdequate:
Type: AWS::DocDB::DBCluster
Properties:
MasterUsername: name
MasterUserPassword: password
BackupRetentionPeriod: 7
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
import unittest

from checkov.cloudformation.checks.resource.aws.DocDBBackupRetention import check
from checkov.cloudformation.runner import Runner
from checkov.runner_filter import RunnerFilter


class TestDocDBAuditLogs(unittest.TestCase):
def test_summary(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = current_dir + "/example_DocDBBackupRetention"
report = runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
"AWS::DocDB::DBCluster.DocDBAdequate",
}
failing_resources = {
"AWS::DocDB::DBCluster.DocDBDefault",
"AWS::DocDB::DBCluster.DocDBNotAdequate",
}

passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 2)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# pass

resource "aws_docdb_cluster" "pass" {
cluster_identifier = "my-docdb-cluster"
engine = "docdb"
master_username = "foo"
master_password = "mustbeeightchars" # checkov:skip=CKV_SECRET_6 test secret

backup_retention_period = 7
}



# fail

resource "aws_docdb_cluster" "fail_no_value" {
cluster_identifier = "my-docdb-cluster"
engine = "docdb"
master_username = "foo"
master_password = "mustbeeightchars" # checkov:skip=CKV_SECRET_6 test secret
}


resource "aws_docdb_cluster" "fail_value_not_adequate" {
cluster_identifier = "my-docdb-cluster"
engine = "docdb"
master_username = "foo"
master_password = "mustbeeightchars" # checkov:skip=CKV_SECRET_6 test secret

backup_retention_period = 3
}
43 changes: 43 additions & 0 deletions tests/terraform/checks/resource/aws/test_DocDBBackupRetention.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import unittest

from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner
from checkov.common.models.enums import CheckResult
from checkov.terraform.checks.resource.aws.DocDBBackupRetention import check


class TestDocDBBackupRetention(unittest.TestCase):

def test(self):
# given
test_files_dir = Path(__file__).parent / "example_DocDBBackupRetention"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_docdb_cluster.pass",
}
failing_resources = {
"aws_docdb_cluster.fail_no_value",
"aws_docdb_cluster.fail_value_not_adequate"
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], len(passing_resources))
self.assertEqual(summary["failed"], len(failing_resources))
self.assertEqual(summary["parsing_errors"], 0)
self.assertEqual(summary["resource_count"], 3)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

if __name__ == '__main__':
unittest.main()

0 comments on commit 2db2347

Please sign in to comment.