Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Populate cvss #3147

Merged
merged 10 commits into from
Jul 26, 2023
173 changes: 95 additions & 78 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,60 @@ class CVEDB:
gad_source.GAD_Source,
]

INSERT_QUERIES = {
"insert_severity": """
INSERT or REPLACE INTO cve_severity(
CVE_number,
severity,
description,
score,
cvss_version,
cvss_vector,
data_source,
last_modified
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
"insert_cve_range": """
INSERT or REPLACE INTO cve_range(
cve_number,
vendor,
product,
version,
versionStartIncluding,
versionStartExcluding,
versionEndIncluding,
versionEndExcluding,
data_source
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
"insert_exploit": """
INSERT or REPLACE INTO cve_exploited (
cve_number,
product,
description
)
VALUES (?,?,?)
""",
"insert_cve_metrics": """
INSERT or REPLACE INTO cve_metrics (
cve_number,
metric_id,
metric_score,
metric_field
)
VALUES (?, ?, ?, ?)
""",
"insert_metrics": """
INSERT or REPLACE INTO metrics (
metrics_id,
metrics_name
)
VALUES (?, ?)
""",
}

def __init__(
self,
sources=None,
Expand Down Expand Up @@ -316,66 +370,6 @@ def table_schemas(self):
metrics_table,
)

def insert_queries(self):
cve_severity = """
cve_severity(
CVE_number,
severity,
description,
score,
cvss_version,
cvss_vector,
data_source,
last_modified
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_severity = f"INSERT or REPLACE INTO {cve_severity}"
insert_cve_range = """
INSERT or REPLACE INTO cve_range(
cve_number,
vendor,
product,
version,
versionStartIncluding,
versionStartExcluding,
versionEndIncluding,
versionEndExcluding,
data_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_exploit = """
INSERT or REPLACE INTO cve_exploited (
cve_number,
product,
description
)
VALUES (?,?,?)
"""
insert_cve_metrics = """
INSERT or REPLACE INTO cve_metrics (
cve_number,
metric_id,
metric_score,
metric_field
)
VALUES (?, ?, ?, ?)
"""
insert_metrics = """
INSERT or REPLACE INTO metrics (
metrics_id,
metrics_name
)
VALUES (?, ?)
"""
return (
insert_severity,
insert_cve_range,
insert_exploit,
insert_cve_metrics,
insert_metrics,
)

def init_database(self) -> None:
"""Initialize db tables used for storing cve/version data"""

Expand Down Expand Up @@ -481,6 +475,7 @@ def populate_db(self) -> None:

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(severity_data, cursor, data_source=source_name)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
Expand All @@ -492,8 +487,7 @@ def populate_db(self) -> None:
self.db_close()

def populate_severity(self, severity_data, cursor, data_source):
(insert_severity, _, _, _, _) = self.insert_queries()
del_cve_range = "DELETE from cve_range where CVE_number=?"
insert_severity = self.INSERT_QUERIES["insert_severity"]

for cve in severity_data:
# Check no None values
Expand Down Expand Up @@ -531,11 +525,41 @@ def populate_severity(self, severity_data, cursor, data_source):
except Exception as e:
LOGGER.info(f"Unable to insert data for {data_source} - {e}\n{cve}")

def populate_cve_metrics(self, severity_data, cursor):
insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]
del_cve_range = "DELETE from cve_range where CVE_number=?"

for cve in severity_data:
# Check no None values
if not bool(cve.get("score")):
LOGGER.debug(f"Update score for {cve['ID']}")
cve["score"] = "unknown"
if not bool(cve.get("CVSS_version")):
LOGGER.debug(f"Update CVSS version for {cve['ID']}")
cve["CVSS_version"] = "unknown"
if not bool(cve.get("CVSS_vector")):
LOGGER.debug(f"Update CVSS Vector for {cve['ID']}")
cve["CVSS_vector"] = "unknown"

for cve in severity_data:
try:
cursor.execute(
insert_cve_metrics,
[
cve["ID"],
cve["CVSS_version"],
Copy link
Contributor

@anthonyharrison anthonyharrison Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second parameter of the cve_metrics table should be referencing the name of the metric in the metrics table and not be the metric name

For a CVSS-3 CVE, we should have CVE_number, 3, score, vector where 3 is the INDEX of the CVSS-3 string in the metrics table (note that the value 3 should be obtained from the metrics table and not hard-coded)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sure, I will update it.

cve["score"],
cve["CVSS_vector"],
],
)
except Exception as e:
LOGGER.info(f"Unable to insert data for {e}\n{cve}")

# Delete any old range entries for this CVE_number
cursor.executemany(del_cve_range, [(cve["ID"],) for cve in severity_data])

def populate_affected(self, affected_data, cursor, data_source):
(_, insert_cve_range, _, _, _) = self.insert_queries()
insert_cve_range = self.INSERT_QUERIES["insert_cve_range"]
try:
cursor.executemany(
insert_cve_range,
Expand All @@ -560,7 +584,7 @@ def populate_affected(self, affected_data, cursor, data_source):
def populate_metrics(self):
cursor = self.db_open_and_get_cursor()
# Insert a row without specifying cve_metrics_id
(_, _, _, _, insert_metrics) = self.insert_queries()
insert_metrics = self.INSERT_QUERIES["insert_metrics"]
data = [
(1, "EPSS"),
(2, "CVSS-2"),
Expand Down Expand Up @@ -762,14 +786,14 @@ def create_exploit_db(self):
self.db_close()

def populate_exploit_db(self, exploits):
(_, _, insert_exploit, _, _) = self.insert_queries()
insert_exploit = self.INSERT_QUERIES["insert_exploit"]
cursor = self.db_open_and_get_cursor()
cursor.executemany(insert_exploit, exploits)
self.connection.commit()
self.db_close()

def store_epss_data(self):
(_, _, _, insert_cve_metrics, _) = self.insert_queries()
insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]
cursor = self.db_open_and_get_cursor()
cursor.executemany(insert_cve_metrics, self.epss_data)
self.connection.commit()
Expand Down Expand Up @@ -923,13 +947,6 @@ def db_to_json(self, path, private_key, passphrase):
shutil.rmtree(temp_gnupg_home)

def json_to_db(self, cursor, db_column, json_data):
(
insert_severity,
insert_cve_range,
insert_exploit,
insert_cve_metrics,
insert_metrics,
) = self.insert_queries()
columns = []
for data in json_data:
column = list(data.keys())
Expand All @@ -945,15 +962,15 @@ def json_to_db(self, cursor, db_column, json_data):
values.append(list(value))

if db_column == "cve_exploited":
cursor.executemany(insert_exploit, values)
cursor.executemany(self.INSERT_QUERIES["insert_exploit"], values)
elif db_column == "cve_range":
cursor.executemany(insert_cve_range, values)
cursor.executemany(self.INSERT_QUERIES["insert_cve_range"], values)
elif db_column == "cve_severity":
cursor.executemany(insert_severity, values)
cursor.executemany(self.INSERT_QUERIES["insert_severity"], values)
elif db_column == "cve_metrics":
cursor.executemany(insert_cve_metrics, values)
cursor.executemany(self.INSERT_QUERIES["insert_cve_metrics"], values)
elif db_column == "metrics":
cursor.executemany(insert_metrics, values)
cursor.executemany(self.INSERT_QUERIES["insert_metrics"], values)

def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error):
try:
Expand Down