Skip to content

Commit

Permalink
fix(aws): handle global WAFv2 ACLs in service (#5628)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCloudSec authored Nov 5, 2024
1 parent 1b50fdb commit 6ff1c43
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"CheckID": "autoscaling_group_capacity_rebalance_enabled",
"CheckTitle": "Check if Amazon EC2 Auto Scaling groups have capacity rebalance enabled.",
"CheckType": [
"Resiliance"
"Resilience"
],
"ServiceName": "autoscaling",
"SubServiceName": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"CheckID": "ecs_task_definitions_logging_block_mode",
"CheckTitle": "ECS task definitions containers should have a logging configured with non blocking mode",
"CheckType": [
"Resiliance"
"Resilience"
],
"ServiceName": "ecs",
"SubServiceName": "",
Expand Down
82 changes: 44 additions & 38 deletions prowler/providers/aws/services/wafv2/wafv2_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ def _list_web_acls_regional(self, regional_client):
def _get_logging_configuration(self, acl):
logger.info("WAFv2 - Get Logging Configuration...")
try:
logging_enabled = self.regional_clients[
acl.region
].get_logging_configuration(ResourceArn=acl.arn)
acl.logging_enabled = bool(
logging_enabled["LoggingConfiguration"]["LogDestinationConfigs"]
)
if acl.scope == Scope.REGIONAL or acl.region in self.regional_clients:
logging_enabled = self.regional_clients[
acl.region
].get_logging_configuration(ResourceArn=acl.arn)
acl.logging_enabled = bool(
logging_enabled["LoggingConfiguration"]["LogDestinationConfigs"]
)

except ClientError as error:
if error.response["Error"]["Code"] == "WAFNonexistentItemException":
Expand All @@ -98,7 +99,7 @@ def _get_logging_configuration(self, acl):
def _list_resources_for_web_acl(self, acl):
logger.info("WAFv2 - Describing resources...")
try:
if acl.scope == Scope.REGIONAL:
if acl.scope == Scope.REGIONAL or acl.region in self.regional_clients:
for resource in self.regional_clients[
acl.region
].list_resources_for_web_acl(
Expand All @@ -125,33 +126,34 @@ def _list_resources_for_web_acl(self, acl):
def _get_web_acl(self, acl: str):
logger.info("WAFv2 - Getting Web ACL...")
try:
scope = acl.scope.value
get_web_acl = self.regional_clients[acl.region].get_web_acl(
Name=acl.name, Scope=scope, Id=acl.id
)
if acl.scope == Scope.REGIONAL or acl.region in self.regional_clients:
scope = acl.scope.value
get_web_acl = self.regional_clients[acl.region].get_web_acl(
Name=acl.name, Scope=scope, Id=acl.id
)

try:
rules = get_web_acl.get("WebACL", {}).get("Rules", [])
for rule in rules:
new_rule = Rule(
name=rule.get("Name", ""),
cloudwatch_metrics_enabled=rule.get("VisibilityConfig", {}).get(
"CloudWatchMetricsEnabled", False
),
try:
rules = get_web_acl.get("WebACL", {}).get("Rules", [])
for rule in rules:
new_rule = Rule(
name=rule.get("Name", ""),
cloudwatch_metrics_enabled=rule.get(
"VisibilityConfig", {}
).get("CloudWatchMetricsEnabled", False),
)
if (
rule.get("Statement", {})
.get("RuleGroupReferenceStatement", {})
.get("ARN")
):
acl.rule_groups.append(new_rule)
else:
acl.rules.append(new_rule)

except Exception as error:
logger.error(
f"{acl.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if (
rule.get("Statement", {})
.get("RuleGroupReferenceStatement", {})
.get("ARN")
):
acl.rule_groups.append(new_rule)
else:
acl.rules.append(new_rule)

except Exception as error:
logger.error(
f"{acl.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

except Exception as error:
logger.error(
Expand All @@ -161,12 +163,16 @@ def _get_web_acl(self, acl: str):
def _list_tags(self, resource: any):
logger.info("WAFv2 - Listing tags...")
try:
resource.tags = (
self.regional_clients[resource.region]
.list_tags_for_resource(ResourceARN=resource.arn)
.get("TagInfoForResource", {})
.get("TagList", [])
)
if (
resource.scope == Scope.REGIONAL
or resource.region in self.regional_clients
):
resource.tags = (
self.regional_clients[resource.region]
.list_tags_for_resource(ResourceARN=resource.arn)
.get("TagInfoForResource", {})
.get("TagList", [])
)
except Exception as error:
logger.error(
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand Down

0 comments on commit 6ff1c43

Please sign in to comment.