-
Notifications
You must be signed in to change notification settings - Fork 0
/
Lambda_function_ecr_findings.py
216 lines (185 loc) · 7.95 KB
/
Lambda_function_ecr_findings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/bin/python3.10
#####################################################################
# Script developed by Sheyla Leacock
#####################################################################
import boto3
import pandas as pd
from datetime import datetime
import botocore.exceptions
import logging
import os
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# Defining boto3 clients for the connection to AWS services
ecr_client = boto3.client('ecr')
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
# Defining the method to obtain the ECR registry information and getting the registryId that will be needed as a parameter for the next method.
def get_registry_id():
try:
registry = ecr_client.describe_registry()
return registry.get("registryId")
except botocore.exceptions.ClientError as error:
logger.error("Failed to describe registry: %s", error)
return None
# Defining the method to list the ECR images by sending the registryId and repositoryName parameters.
def get_list_images(registry_id, repository_name):
try:
response = ecr_client.list_images(
registryId=registry_id,
repositoryName=repository_name
)
image_ids = []
# Iterate through the images to find the image tag and image digest information
for image in response.get('imageIds', []):
image_digest = image.get('imageDigest')
image_tag = image.get('imageTag', 'latest') # 'latest' como valor predeterminado si no se encuentra imageTag
image_ids.append({
'imageDigest': image_digest,
'imageTag': image_tag
})
return image_ids
except botocore.exceptions.ClientError as error:
logger.error("Failed to list images: %s", error)
return []
# Defining the method to describe the findings of an image scan by sending the previously obtained values of the registryId, repositoryName and image id as parameters.
def get_image_scan_findings(registry_id, repository_name, image_id):
try:
findings = []
response = ecr_client.describe_image_scan_findings(
registryId=registry_id,
repositoryName=repository_name,
imageId=image_id,
maxResults=1000
)
findings.extend(response.get("imageScanFindings", {}).get("enhancedFindings", []))
while 'nextToken' in response:
response = ecr_client.describe_image_scan_findings(
registryId=registry_id,
repositoryName=repository_name,
imageId=image_id,
nextToken=response['nextToken'],
maxResults=1000
)
findings.extend(response.get("imageScanFindings", {}).get("enhancedFindings", []))
return findings
except botocore.exceptions.ClientError as error:
logger.error("Failed to describe image scan findings: %s", error)
return []
# Getting the findings details for the report
def process_findings(findings):
processed_findings = []
for finding in findings:
cvss_scores = finding.get('packageVulnerabilityDetails', {}).get('cvss', [])
base_score = cvss_scores[0].get('baseScore') if cvss_scores else None
processed_findings.append({
"Finding ARN": finding.get('findingArn'),
"Description": finding.get('description'),
"Severity": finding.get('severity'),
"CVSS Base Score": base_score
})
return processed_findings
# Using pandas library method json_normalize to normalize the JSON list of findings into a table and saving the normalized data to an Excel file
def save_to_excel(data, file_path):
try:
df = pd.json_normalize(data)
df.to_excel(file_path, index=False)
logger.info("Findings saved to Excel successfully")
except Exception as e:
logger.error("Failed to save findings to Excel: %s", e)
# Defining a method to upload the previously generated Excel file of findings to the S3 bucket.
def upload_to_s3(file_path, bucket_name, object_name):
try:
s3_client.upload_file(file_path, bucket_name, object_name)
logger.info("File uploaded to S3 successfully")
except botocore.exceptions.ClientError as error:
logger.error("Failed to upload file to S3: %s", error)
raise ValueError('An error occurred: {}'.format(error))
# Generating a presigned URL for the uploaded report file
def generate_presigned_url(bucket_name, object_name):
try:
presigned_url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': object_name},
ExpiresIn=86400) # URL expira en 1 día
logger.info("Presigned URL generated successfully")
return presigned_url
except botocore.exceptions.ClientError as error:
logger.error("Failed to generate pre-signed URL: %s", error)
return None
# Publishing the message and URL to the SNS topic
def publish_to_sns(url, topic_arn):
message = f"Hola, el reporte de hallazgos de ECR ya se encuentra listo. Puedes acceder a él mediante esta URL: {url}"
try:
response = sns_client.publish(
TopicArn=topic_arn,
Message=message,
Subject='ECR Findings Report'
)
logger.info("Message published to SNS successfully")
return response
except botocore.exceptions.ClientError as error:
logger.error("Failed to publish message to SNS: %s", error)
return None
def lambda_handler(event, context):
# Accessing environment variables
bucket_name = os.environ["BUCKET_NAME"]
repository_name = os.environ["REPOSITORY_NAME"]
topic_arn = os.environ["TOPIC_ARN"]
if not all([bucket_name, repository_name, topic_arn]):
logger.error("Missing required parameters: bucket_name, repository_name, and topic_arn")
return {
'statusCode': 400,
'body': 'Missing required parameters: bucket_name, repository_name, and topic_arn'
}
registry_id = get_registry_id()
if not registry_id:
return {
'statusCode': 500,
'body': 'Failed to get registry ID'
}
image_ids = get_list_images(registry_id, repository_name)
if not image_ids:
return {
'statusCode': 404,
'body': 'No images found'
}
all_findings = []
for image in image_ids:
image_findings = get_image_scan_findings(registry_id, repository_name, image)
all_findings.extend(process_findings(image_findings))
if not all_findings:
logger.info("No findings found.")
return {
'statusCode': 404,
'body': 'No findings found'
}
file_path = '/tmp/ecr_findings_report.xlsx'
# Adding a timestamp for the generated report every time it is uploaded.
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
object_name = f'ecr_findings_report_{timestamp}.xlsx'
save_to_excel(all_findings, file_path)
# Check if bucket exists and create it if it does not
try:
list_buckets = s3_client.list_buckets().get('Buckets', [])
if bucket_name not in [bucket['Name'] for bucket in list_buckets]:
s3_client.create_bucket(
ACL='private',
Bucket=bucket_name
)
upload_to_s3(file_path, bucket_name, object_name)
# Generating a presigned URL
url = generate_presigned_url(bucket_name, object_name)
if url:
publish_to_sns(url, topic_arn)
except botocore.exceptions.ClientError as error:
logger.error("An error occurred: %s", error)
return {
'statusCode': 500,
'body': f'Error creating bucket or uploading file: {error}'
}
return {
'statusCode': 200,
'body': 'Findings processed and uploaded successfully'
}