diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index 8e7e7bd9a9..f0b180230a 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -475,6 +475,7 @@ def populate_db(self) -> None: if severity_data is not None and len(severity_data) > 0: self.populate_severity(severity_data, cursor, data_source=source_name) + self.populate_cve_metrics(severity_data, cursor) if affected_data is not None: self.populate_affected( affected_data, @@ -529,6 +530,36 @@ def populate_severity(self, severity_data, cursor, data_source): for cve in severity_data: cursor.execute(del_cve_range, [cve["ID"], data_source]) + def populate_cve_metrics(self, severity_data, cursor): + insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"] + + for cve in severity_data: + # Check no None values + if not bool(cve.get("score")): + LOGGER.debug(f"Update score for {cve['ID']}") + cve["score"] = "unknown" + if not bool(cve.get("CVSS_version")): + LOGGER.debug(f"Update CVSS version for {cve['ID']}") + cve["CVSS_version"] = "unknown" + if not bool(cve.get("CVSS_vector")): + LOGGER.debug(f"Update CVSS Vector for {cve['ID']}") + cve["CVSS_vector"] = "unknown" + + for cve in severity_data: + try: + metric = self.metric_finder(cursor, cve) + cursor.execute( + insert_cve_metrics, + [ + cve["ID"], + metric, + cve["score"], + cve["CVSS_vector"], + ], + ) + except Exception as e: + LOGGER.info(f"Unable to insert data for {e}\n{cve}") + def populate_affected(self, affected_data, cursor, data_source): insert_cve_range = self.INSERT_QUERIES["insert_cve_range"] try: @@ -567,6 +598,24 @@ def populate_metrics(self): self.connection.commit() self.db_close() + def metric_finder(self, cursor, cve): + # SQL query to retrieve the metrics_name based on the metrics_id + # currently cve["CVSS_version"] return 2,3 based on there version and they are mapped accordingly to there metrics name in metrics table. + query = """ + SELECT metrics_id FROM metrics + WHERE metrics_id=? + """ + metric = None + if cve["CVSS_version"] == "unknown": + metric = "unknown" + else: + cursor.execute(query, [cve.get("CVSS_version")]) + # Fetch all the results of the query and use 'map' to extract only the 'metrics_name' from the result + metric = list(map(lambda x: x[0], cursor.fetchall())) + # Since the query is expected to return a single result, extract the first item from the list and store it in 'metric' + metric = metric[0] + return metric + def clear_cached_data(self) -> None: self.create_cache_backup() if self.cachedir.exists():