Skip to content

Commit

Permalink
feat: Populate cvss (intel#3147)
Browse files Browse the repository at this point in the history
* fixes: intel#3146
  • Loading branch information
Rexbeast2 authored Jul 26, 2023
1 parent e82d7ea commit e2d1ef7
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def populate_db(self) -> None:

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(severity_data, cursor, data_source=source_name)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
Expand Down Expand Up @@ -529,6 +530,36 @@ def populate_severity(self, severity_data, cursor, data_source):
for cve in severity_data:
cursor.execute(del_cve_range, [cve["ID"], data_source])

def populate_cve_metrics(self, severity_data, cursor):
insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]

for cve in severity_data:
# Check no None values
if not bool(cve.get("score")):
LOGGER.debug(f"Update score for {cve['ID']}")
cve["score"] = "unknown"
if not bool(cve.get("CVSS_version")):
LOGGER.debug(f"Update CVSS version for {cve['ID']}")
cve["CVSS_version"] = "unknown"
if not bool(cve.get("CVSS_vector")):
LOGGER.debug(f"Update CVSS Vector for {cve['ID']}")
cve["CVSS_vector"] = "unknown"

for cve in severity_data:
try:
metric = self.metric_finder(cursor, cve)
cursor.execute(
insert_cve_metrics,
[
cve["ID"],
metric,
cve["score"],
cve["CVSS_vector"],
],
)
except Exception as e:
LOGGER.info(f"Unable to insert data for {e}\n{cve}")

def populate_affected(self, affected_data, cursor, data_source):
insert_cve_range = self.INSERT_QUERIES["insert_cve_range"]
try:
Expand Down Expand Up @@ -567,6 +598,24 @@ def populate_metrics(self):
self.connection.commit()
self.db_close()

def metric_finder(self, cursor, cve):
# SQL query to retrieve the metrics_name based on the metrics_id
# currently cve["CVSS_version"] return 2,3 based on there version and they are mapped accordingly to there metrics name in metrics table.
query = """
SELECT metrics_id FROM metrics
WHERE metrics_id=?
"""
metric = None
if cve["CVSS_version"] == "unknown":
metric = "unknown"
else:
cursor.execute(query, [cve.get("CVSS_version")])
# Fetch all the results of the query and use 'map' to extract only the 'metrics_name' from the result
metric = list(map(lambda x: x[0], cursor.fetchall()))
# Since the query is expected to return a single result, extract the first item from the list and store it in 'metric'
metric = metric[0]
return metric

def clear_cached_data(self) -> None:
self.create_cache_backup()
if self.cachedir.exists():
Expand Down

0 comments on commit e2d1ef7

Please sign in to comment.