Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws): add MemoryDB service #5546

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
4 changes: 4 additions & 0 deletions prowler/providers/aws/services/memorydb/memorydb_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from prowler.providers.aws.services.memorydb.memorydb_service import MemoryDB
from prowler.providers.common.provider import Provider

memorydb_client = MemoryDB(Provider.get_global_provider())
66 changes: 66 additions & 0 deletions prowler/providers/aws/services/memorydb/memorydb_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from pydantic import BaseModel

from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService


################## MemoryDB
class MemoryDB(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.clusters = {}
self.__threading_call__(self._describe_clusters)

def _describe_clusters(self, regional_client):
logger.info("MemoryDB - Describe Clusters...")
try:
describe_clusters_paginator = regional_client.get_paginator(
"describe_clusters"
)
for page in describe_clusters_paginator.paginate():
for cluster in page["Clusters"]:
arn = cluster["ARN"]
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
self.clusters[arn] = Cluster(
name=cluster["Name"],
arn=arn,
number_of_shards=cluster["NumberOfShards"],
engine=cluster["Engine"],
engine_version=cluster["EngineVersion"],
engine_patch_version=cluster["EnginePatchVersion"],
multi_az=cluster.get("AvailabilityMode", "singleaz"),
region=regional_client.region,
security_groups=[
sg["SecurityGroupId"]
for sg in cluster["SecurityGroups"]
if sg["Status"] == "active"
],
tls_enabled=cluster["TLSEnabled"],
auto_minor_version_upgrade=cluster[
"AutoMinorVersionUpgrade"
],
snapshot_limit=cluster["SnapshotRetentionLimit"],
)
except Exception as error:
logger.error(

Check warning on line 49 in prowler/providers/aws/services/memorydb/memorydb_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/memorydb/memorydb_service.py#L48-L49

Added lines #L48 - L49 were not covered by tests
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class Cluster(BaseModel):
name: str
arn: str
number_of_shards: int
engine: str
engine_version: str
engine_patch_version: str
multi_az: str
region: str
security_groups: list[str] = []
tls_enabled: bool
auto_minor_version_upgrade: bool
snapshot_limit: int
110 changes: 110 additions & 0 deletions tests/providers/aws/services/memorydb/memorydb_service_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import botocore
from mock import patch

from prowler.providers.aws.services.memorydb.memorydb_service import Cluster, MemoryDB
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)

MEM_DB_CLUSTER_NAME = "test-cluster"
MEM_DB_CLUSTER_ARN = f"arn:aws:memorydb:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:cluster:{MEM_DB_CLUSTER_NAME}"
MEM_DB_ENGINE_VERSION = "5.0.0"

# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call


def mock_make_api_call(self, operation_name, kwargs):
"""
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816

We have to mock every AWS API call using Boto3
"""
if operation_name == "DescribeClusters":
return {
"Clusters": [
{
"Name": MEM_DB_CLUSTER_NAME,
"Description": "Test",
"Status": "test",
"NumberOfShards": 123,
"AvailabilityMode": "singleaz",
"Engine": "valkey",
"EngineVersion": MEM_DB_ENGINE_VERSION,
"EnginePatchVersion": "5.0.6",
"SecurityGroups": [
{"SecurityGroupId": "sg-0a1434xxxxxc9fae", "Status": "active"},
],
"TLSEnabled": True,
"ARN": MEM_DB_CLUSTER_ARN,
"SnapshotRetentionLimit": 5,
"AutoMinorVersionUpgrade": True,
},
]
}
return make_api_call(self, operation_name, kwargs)


def mock_generate_regional_clients(provider, service):
regional_client = provider._session.current_session.client(
service, region_name=AWS_REGION_US_EAST_1
)
regional_client.region = AWS_REGION_US_EAST_1
return {AWS_REGION_US_EAST_1: regional_client}


@patch(
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
new=mock_generate_regional_clients,
)
# Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_MemoryDB_Service:
# Test MemoryDB Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.service == "memorydb"

# Test MemoryDB Client
def test_client(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.client.__class__.__name__ == "MemoryDB"

# Test MemoryDB Session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.session.__class__.__name__ == "Session"

# Test MemoryDB Session
def test_audited_account(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.audited_account == AWS_ACCOUNT_NUMBER

# Test MemoryDB Describe Clusters
def test_describe_clusters(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.clusters == {
MEM_DB_CLUSTER_ARN: Cluster(
name=MEM_DB_CLUSTER_NAME,
arn=MEM_DB_CLUSTER_ARN,
number_of_shards=123,
engine="valkey",
engine_version=MEM_DB_ENGINE_VERSION,
engine_patch_version="5.0.6",
multi_az="singleaz",
region=AWS_REGION_US_EAST_1,
security_groups=["sg-0a1434xxxxxc9fae"],
tls_enabled=True,
auto_minor_version_upgrade=True,
snapshot_limit=5,
)
}