Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dms): add new check dms_replication_task_source_logging_enabled #5627

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "dms_replication_task_source_logging_enabled",
"CheckTitle": "Check if DMS replication tasks for the source database have logging enabled.",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "dms",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:dms:region:account-id:task/task-id",
"Severity": "medium",
"ResourceType": "AwsDmsReplicationTask",
"Description": "This control checks whether logging is enabled with the minimum severity level of LOGGER_SEVERITY_DEFAULT for DMS replication tasks SOURCE_CAPTURE and SOURCE_UNLOAD. The control fails if logging isn't enabled for these tasks or if the minimum severity level is less than LOGGER_SEVERITY_DEFAULT.",
"Risk": "Without logging enabled, issues in data migration may go undetected, affecting the integrity and compliance of replicated data.",
"RelatedUrl": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Monitoring.html#CHAP_Monitoring.ManagingLogs",
"Remediation": {
"Code": {
"CLI": "aws dms modify-replication-task --replication-task-arn <task-arn> --task-settings '{\"Logging\":{\"EnableLogging\":true,\"LogComponents\":[{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]}}'",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/dms-controls.html#dms-8",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable logging for source database DMS replication tasks with a minimum severity level of LOGGER_SEVERITY_DEFAULT.",
"Url": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import List

from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.dms.dms_client import dms_client


class dms_replication_task_source_logging_enabled(Check):
"""
Check if AWS DMS replication tasks have logging enabled with the required
logging components and severity levels.

This class verifies that each DMS replication task has logging enabled
and that the components SOURCE_CAPTURE and SOURCE_UNLOAD are configured with
at least LOGGER_SEVERITY_DEFAULT severity level. If either component is missing
or does not meet the minimum severity requirement, the check will fail.
"""

def execute(self) -> List[Check_Report_AWS]:
"""
Execute the DMS replication task logging requirements check.

Iterates over all DMS replication tasks and generates a report indicating
whether each task has logging enabled and meets the logging requirements
for SOURCE_CAPTURE and SOURCE_UNLOAD components.

Returns:
List[Check_Report_AWS]: A list of report objects with the results of the check.
"""
MINIMUM_SEVERITY_LEVELS = [
"LOGGER_SEVERITY_DEFAULT",
"LOGGER_SEVERITY_DEBUG",
"LOGGER_SEVERITY_DETAILED_DEBUG",
]
findings = []
for (
replication_task_arn,
replication_task,
) in dms_client.replication_tasks.items():
report = Check_Report_AWS(self.metadata())
report.resource_id = replication_task.id
report.resource_arn = replication_task_arn
report.region = replication_task.region
report.resource_tags = replication_task.tags

if not replication_task.logging_enabled:
report.status = "FAIL"
report.status_extended = f"DMS Replication Task {replication_task.id} does not have logging enabled for source events."
else:
missing_components = []
source_capture_compliant = False
source_unload_compliant = False

for component in replication_task.log_components:
if (
component["Id"] == "SOURCE_CAPTURE"
and component["Severity"] in MINIMUM_SEVERITY_LEVELS
):
source_capture_compliant = True
elif (
component["Id"] == "SOURCE_UNLOAD"
and component["Severity"] in MINIMUM_SEVERITY_LEVELS
):
source_unload_compliant = True

if not source_capture_compliant:
missing_components.append("Source Capture")
if not source_unload_compliant:
missing_components.append("Source Unload")

if source_capture_compliant and source_unload_compliant:
report.status = "PASS"
report.status_extended = f"DMS Replication Task {replication_task.id} has logging enabled with the minimum severity level in source events."
else:
report.status = "FAIL"
report.status_extended = f"DMS Replication Task {replication_task.id} does not meet the minimum severity level of logging in {', '.join(missing_components)} events."

findings.append(report)

return findings
48 changes: 47 additions & 1 deletion prowler/providers/aws/services/dms/dms_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Optional

from pydantic import BaseModel
Expand All @@ -13,10 +14,13 @@ def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.instances = []
self.endpoints = {}
self.replication_tasks = {}
self.__threading_call__(self._describe_replication_instances)
self.__threading_call__(self._list_tags, self.instances)
self.__threading_call__(self._describe_endpoints)
self.__threading_call__(self._list_tags, self.endpoints.values())
self.__threading_call__(self._describe_replication_tasks)
self.__threading_call__(self._list_tags, self.replication_tasks.values())

def _describe_replication_instances(self, regional_client):
logger.info("DMS - Describing DMS Replication Instances...")
Expand Down Expand Up @@ -87,6 +91,37 @@ def _describe_endpoints(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _describe_replication_tasks(self, regional_client):
logger.info("DMS - Describing DMS Replication Tasks for Logging Settings...")
try:
paginator = regional_client.get_paginator("describe_replication_tasks")
for page in paginator.paginate():
for task in page["ReplicationTasks"]:
arn = task["ReplicationTaskArn"]
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
task_settings = json.loads(
task.get("ReplicationTaskSettings", "")
)
self.replication_tasks[arn] = ReplicationTasks(
arn=arn,
id=task["ReplicationTaskIdentifier"],
region=regional_client.region,
source_endpoint_arn=task["SourceEndpointArn"],
target_endpoint_arn=task["TargetEndpointArn"],
logging_enabled=task_settings.get("Logging", {}).get(
"EnableLogging", False
),
log_components=task_settings.get("Logging", {}).get(
"LogComponents", []
),
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _list_tags(self, resource: any):
try:
resource.tags = self.regional_clients[
Expand All @@ -103,11 +138,11 @@ class Endpoint(BaseModel):
id: str
region: str
ssl_mode: str
tags: Optional[list]
redis_ssl_protocol: str
mongodb_auth_type: str
neptune_iam_auth_enabled: bool = False
engine_name: str
tags: Optional[list]


class RepInstance(BaseModel):
Expand All @@ -121,3 +156,14 @@ class RepInstance(BaseModel):
multi_az: bool
region: str
tags: Optional[list] = []


class ReplicationTasks(BaseModel):
arn: str
id: str
region: str
source_endpoint_arn: str
target_endpoint_arn: str
logging_enabled: bool = False
log_components: list[dict] = []
tags: Optional[list] = []
Loading