Skip to content

Commit

Permalink
feat(mutelist): add mute_finding method (#5563)
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrooot authored Nov 4, 2024
1 parent 17dd9de commit bf91113
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 18 deletions.
2 changes: 1 addition & 1 deletion prowler/lib/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
default_output_directory,
)
from prowler.lib.check.models import Severity
from prowler.lib.outputs.finding import Status
from prowler.lib.outputs.common import Status
from prowler.providers.common.arguments import (
init_providers_parser,
validate_provider_arguments,
Expand Down
31 changes: 31 additions & 0 deletions prowler/lib/mutelist/mutelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from prowler.lib.logger import logger
from prowler.lib.mutelist.models import mutelist_schema
from prowler.lib.outputs.common import Status
from prowler.lib.outputs.utils import unroll_dict, unroll_tags


class Mutelist(ABC):
Expand Down Expand Up @@ -237,6 +239,35 @@ def is_muted_in_check(
)
return False

def mute_finding(self, finding):
"""
Check if the provided finding is muted
Args:
finding (Finding): The finding to be evaluated for muting.
Returns:
Finding: The finding with the status updated if it is muted, otherwise the finding is returned
"""
try:
if self.is_muted(
finding.account_uid,
finding.metadata.CheckID,
finding.region,
finding.resource_uid,
unroll_dict(unroll_tags(finding.resource_tags)),
):
finding.raw["status"] = finding.status
finding.status = Status.MUTED
finding.muted = True
return finding
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]"
)
return finding

def is_excepted(
self,
exceptions,
Expand Down
9 changes: 9 additions & 0 deletions prowler/lib/outputs/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from enum import Enum

from prowler.config.config import timestamp
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.utils.utils import outputs_unix_timestamp
Expand All @@ -15,3 +17,10 @@ def fill_common_finding_data(finding: dict, unix_timestamp: bool) -> dict:
"resource_tags": unroll_tags(finding.resource_tags),
}
return finding_data


class Status(str, Enum):
PASS = "PASS"
FAIL = "FAIL"
MANUAL = "MANUAL"
MUTED = "MUTED"
17 changes: 2 additions & 15 deletions prowler/lib/outputs/finding.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
from datetime import datetime
from enum import Enum
from typing import Optional, Union

from pydantic import BaseModel, Field

from prowler.config.config import prowler_version
from prowler.lib.check.models import Check_Report, CheckMetadata
from prowler.lib.logger import logger
from prowler.lib.outputs.common import fill_common_finding_data
from prowler.lib.outputs.common import Status, fill_common_finding_data
from prowler.lib.outputs.compliance.compliance import get_check_compliance
from prowler.lib.utils.utils import dict_to_lowercase, get_nested_attribute
from prowler.providers.common.provider import Provider


class Status(str, Enum):
PASS = "PASS"
FAIL = "FAIL"
MANUAL = "MANUAL"


class Finding(BaseModel):
"""
Represents the output model for a finding across different providers.
Expand Down Expand Up @@ -49,6 +42,7 @@ class Finding(BaseModel):
region: str
compliance: dict
prowler_version: str = prowler_version
raw: dict = Field(default_factory=dict)

@property
def provider(self) -> str:
Expand Down Expand Up @@ -85,13 +79,6 @@ def service_name(self) -> str:
"""
return self.metadata.ServiceName

@property
def raw(self) -> dict:
"""
Returns the raw (dict) finding without any post-processing.
"""
return {}

def get_metadata(self) -> dict:
"""
Retrieves the metadata of the object and returns it as a dictionary with all keys in lowercase.
Expand Down
3 changes: 2 additions & 1 deletion prowler/lib/scan/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.models import CheckMetadata, Severity
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding, Status
from prowler.lib.outputs.common import Status
from prowler.lib.outputs.finding import Finding
from prowler.lib.scan.exceptions.exceptions import (
ScanInvalidCategoryError,
ScanInvalidCheckError,
Expand Down
3 changes: 2 additions & 1 deletion tests/lib/outputs/finding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
Remediation,
Severity,
)
from prowler.lib.outputs.finding import Finding, Status
from prowler.lib.outputs.common import Status
from prowler.lib.outputs.finding import Finding
from tests.lib.outputs.fixtures.fixtures import generate_finding_output


Expand Down
33 changes: 33 additions & 0 deletions tests/providers/aws/lib/mutelist/aws_mutelist_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from prowler.config.config import encoding_format_utf_8
from prowler.providers.aws.lib.mutelist.mutelist import AWSMutelist
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
from tests.providers.aws.services.awslambda.awslambda_service_test import (
create_zip_file,
)
Expand Down Expand Up @@ -1844,3 +1845,35 @@ def test_is_muted_in_resource_starting_by_star(self):
allowlist_resources = ["*.es"]

assert AWSMutelist.is_item_matched(allowlist_resources, "google.es")

def test_mute_finding(self):
# Mutelist
mutelist_content = {
"Accounts": {
AWS_ACCOUNT_NUMBER: {
"Checks": {
"check_test": {
"Regions": [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
"Resources": ["prowler", "^test", "prowler-pro"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)

# Finding
finding_1 = generate_finding_output(
check_id="check_test",
status="FAIL",
region=AWS_REGION_US_EAST_1,
resource_uid="prowler",
resource_tags=[],
muted=False,
)

muted_finding = mutelist.mute_finding(finding_1)

assert muted_finding.status == "MUTED"
assert muted_finding.muted
assert muted_finding.raw["status"] == "FAIL"
34 changes: 34 additions & 0 deletions tests/providers/azure/lib/mutelist/azure_mutelist_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from mock import MagicMock

from prowler.providers.azure.lib.mutelist.mutelist import AzureMutelist
from tests.lib.outputs.fixtures.fixtures import generate_finding_output

MUTELIST_FIXTURE_PATH = (
"tests/providers/azure/lib/mutelist/fixtures/azure_mutelist.yaml"
Expand Down Expand Up @@ -66,3 +67,36 @@ def test_is_finding_muted(self):
finding.subscription = "subscription_1"

assert mutelist.is_finding_muted(finding)

def test_mute_finding(self):
# Mutelist
mutelist_content = {
"Accounts": {
"subscription_1": {
"Checks": {
"check_test": {
"Regions": ["*"],
"Resources": ["test_resource"],
}
}
}
}
}

mutelist = AzureMutelist(mutelist_content=mutelist_content)

finding_1 = generate_finding_output(
check_id="check_test",
status="FAIL",
account_uid="subscription_1",
region="subscription_1",
resource_uid="test_resource",
resource_tags=[],
muted=False,
)

muted_finding = mutelist.mute_finding(finding=finding_1)

assert muted_finding.status == "MUTED"
assert muted_finding.muted is True
assert muted_finding.raw["status"] == "FAIL"
34 changes: 34 additions & 0 deletions tests/providers/gcp/lib/mutelist/gcp_mutelist_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from mock import MagicMock

from prowler.providers.gcp.lib.mutelist.mutelist import GCPMutelist
from tests.lib.outputs.fixtures.fixtures import generate_finding_output

MUTELIST_FIXTURE_PATH = "tests/providers/gcp/lib/mutelist/fixtures/gcp_mutelist.yaml"

Expand Down Expand Up @@ -64,3 +65,36 @@ def test_is_finding_muted(self):
finding.project_id = "project_1"

assert mutelist.is_finding_muted(finding)

def test_mute_finding(self):
# Mutelist
mutelist_content = {
"Accounts": {
"project_1": {
"Checks": {
"check_test": {
"Regions": ["*"],
"Resources": ["test_resource"],
}
}
}
}
}

mutelist = GCPMutelist(mutelist_content=mutelist_content)

finding_1 = generate_finding_output(
check_id="check_test",
status="FAIL",
account_uid="project_1",
region="test-region",
resource_uid="test_resource",
resource_tags=[],
muted=False,
)

muted_finding = mutelist.mute_finding(finding=finding_1)

assert muted_finding.status == "MUTED"
assert muted_finding.muted
assert muted_finding.raw["status"] == "FAIL"
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from mock import MagicMock

from prowler.providers.kubernetes.lib.mutelist.mutelist import KubernetesMutelist
from tests.lib.outputs.fixtures.fixtures import generate_finding_output

MUTELIST_FIXTURE_PATH = (
"tests/providers/kubernetes/lib/mutelist/fixtures/kubernetes_mutelist.yaml"
Expand Down Expand Up @@ -128,3 +129,36 @@ def test_is_finding_muted_apiserver_star_within_check_name(self):
finding.resource_tags = []

assert mutelist.is_finding_muted(finding, "cluster_1")

def test_mute_finding(self):
# Mutelist
mutelist_content = {
"Accounts": {
"cluster_1": {
"Checks": {
"check_test": {
"Regions": ["*"],
"Resources": ["test_resource"],
}
}
}
}
}

mutelist = KubernetesMutelist(mutelist_content=mutelist_content)

finding_1 = generate_finding_output(
check_id="check_test",
status="FAIL",
account_uid="cluster_1",
region="test-region",
resource_uid="test_resource",
resource_tags=[],
muted=False,
)

muted_finding = mutelist.mute_finding(finding_1)

assert muted_finding.status == "MUTED"
assert muted_finding.muted is True
assert muted_finding.raw["status"] == "FAIL"

0 comments on commit bf91113

Please sign in to comment.