diff --git a/cve_bin_tool/parsers/__init__.py b/cve_bin_tool/parsers/__init__.py index fc3887289b..c44ebe8420 100644 --- a/cve_bin_tool/parsers/__init__.py +++ b/cve_bin_tool/parsers/__init__.py @@ -1,8 +1,14 @@ # Copyright (C) 2022 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later +import re +import sqlite3 +from pathlib import Path +from typing import List, Tuple + from packageurl import PackageURL +from cve_bin_tool.error_handler import CVEDBError from cve_bin_tool.util import ProductInfo, ScanInfo __all__ = [ @@ -89,3 +95,62 @@ def generate_purl(self, product, vendor, qualifier={}, subpath=None): subpath=subpath, ) return purl + + def find_vendor_from_purl(self, purl, ver) -> Tuple[List[ScanInfo], bool]: + """ + Finds the vendor information for a given PackageURL (purl) and version from the database. + + This method queries the database to retrieve Common Platform Enumeration (CPE) data associated with the given purl. + It then decodes the CPE data to extract vendor, product, and version information. If the version matches the provided + version, it constructs a ScanInfo object for each matching entry and returns a list of these objects. + """ + + query = "SELECT cpe from purl2cpe WHERE purl=?" + cursor = self.db_open_and_get_cursor() + cursor.execute(query, [str(purl)]) + cpeList = cursor.fetchall() + vendorlist: list[ScanInfo] = [] + vendors = set() + + if cpeList != []: + for item in cpeList: + vendor, product, version = self.decode_cpe23(str(item)) + vendors.add((vendor, product)) + else: + return vendorlist, False + + for vendor, product in vendors: + vendorlist.append( + ScanInfo( + ProductInfo(vendor, product, ver, "/usr/local/bin/product"), + self.filename, + ) + ) + + return vendorlist, True + + def db_open_and_get_cursor(self) -> sqlite3.Cursor: + """Opens connection to sqlite database, returns cursor object.""" + + dbpath = ( + Path("~").expanduser() / ".cache" / "cve-bin-tool" / "purl2cpe/purl2cpe.db" + ) + connection = sqlite3.connect(dbpath) + + if connection is not None: + cursor = connection.cursor() + if cursor is None: + raise CVEDBError + return cursor + + def decode_cpe23(self, cpe23) -> Tuple[str, str, str]: + """ + Decodes a CPE 2.3 formatted string to extract vendor, product, and version information. + + """ + + # split on `:` only if it's not escaped + cpe = re.split(r"(?