Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Populate cvss #3147

Merged
merged 10 commits into from
Jul 26, 2023
49 changes: 49 additions & 0 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def populate_db(self) -> None:

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(severity_data, cursor, data_source=source_name)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
Expand Down Expand Up @@ -529,6 +530,36 @@ def populate_severity(self, severity_data, cursor, data_source):
for cve in severity_data:
cursor.execute(del_cve_range, [cve["ID"], data_source])

def populate_cve_metrics(self, severity_data, cursor):
insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]

for cve in severity_data:
# Check no None values
if not bool(cve.get("score")):
LOGGER.debug(f"Update score for {cve['ID']}")
cve["score"] = "unknown"
if not bool(cve.get("CVSS_version")):
LOGGER.debug(f"Update CVSS version for {cve['ID']}")
cve["CVSS_version"] = "unknown"
if not bool(cve.get("CVSS_vector")):
LOGGER.debug(f"Update CVSS Vector for {cve['ID']}")
cve["CVSS_vector"] = "unknown"

for cve in severity_data:
try:
metric = self.metric_finder(cursor, cve)
cursor.execute(
insert_cve_metrics,
[
cve["ID"],
metric,
cve["score"],
cve["CVSS_vector"],
],
)
except Exception as e:
LOGGER.info(f"Unable to insert data for {e}\n{cve}")

def populate_affected(self, affected_data, cursor, data_source):
insert_cve_range = self.INSERT_QUERIES["insert_cve_range"]
try:
Expand Down Expand Up @@ -567,6 +598,24 @@ def populate_metrics(self):
self.connection.commit()
self.db_close()

def metric_finder(self, cursor, cve):
# SQL query to retrieve the metrics_name based on the metrics_id
# currently cve["CVSS_version"] return 2,3 based on there version and they are mapped accordingly to there metrics name in metrics table.
query = """
SELECT metrics_id FROM metrics
WHERE metrics_id=?
"""
metric = None
if cve["CVSS_version"] == "unknown":
metric = "unknown"
else:
cursor.execute(query, [cve.get("CVSS_version")])
# Fetch all the results of the query and use 'map' to extract only the 'metrics_name' from the result
metric = list(map(lambda x: x[0], cursor.fetchall()))
# Since the query is expected to return a single result, extract the first item from the list and store it in 'metric'
metric = metric[0]
return metric

def clear_cached_data(self) -> None:
self.create_cache_backup()
if self.cachedir.exists():
Expand Down