Skip to content

Commit

Permalink
test: tools: neuvector: support vulnerability asset format
Browse files Browse the repository at this point in the history
  • Loading branch information
pna-nca committed May 6, 2024
1 parent 3812e56 commit 598fa43
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 5 deletions.
94 changes: 92 additions & 2 deletions dojo/tools/neuvector/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import logging

from dojo.models import Finding
from datetime import datetime

from dojo.models import Endpoint, Finding

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -32,6 +34,7 @@ def parse_json(self, json_output):

def get_items(self, tree, test):
items = {}
# old-style report with vulnerabilities of an endpoint
if "report" in tree:
vulnerabilityTree = tree.get("report").get("vulnerabilities", [])
for node in vulnerabilityTree:
Expand All @@ -45,6 +48,13 @@ def get_items(self, tree, test):
+ str(node.get("severity"))
)
items[unique_key] = item
# asset-style collection with vulnerabilities of several assets
if "vulnerabilities" in tree:
vulnerabilityTree = tree.get("vulnerabilities", [])
for node in vulnerabilityTree:
item = get_asset_item(node, test)
unique_key = node.get("name") + str(node.get("severity"))
items[unique_key] = item
return list(items.values())


Expand Down Expand Up @@ -113,6 +123,86 @@ def get_item(vulnerability, test):
return finding


def get_asset_item(vulnerability, test):
severity = (
convert_severity(vulnerability.get("severity"))
if "severity" in vulnerability
else "Info"
)

feed_rating = vulnerability.get("feed_rating", "")

description = vulnerability.get("description", "").strip()

if len(feed_rating) > 0:
description += "<p>Rating from vendor: {rating}</p>".format(rating=feed_rating)

mitigation = ""

package_names = []

packages = vulnerability.get("packages", {})
if len(packages.values()) > 0:
mitigation += "<p>update the affected packages to the following versions:</p>"
description += "<p>The following packages are affected:</p>"

for package_name, package_versions in packages.items():
package_names.append(package_name.split('/')[0])

mitigation += "<p>{name}:</p>".format(name=package_name)

description += "<p>{name}:</p>".format(name=package_name)
for versions in package_versions:
mitigation += "<p> {fixed}</p>".format(fixed=versions.get("fixed_version", "unknown"))

description += "<p> installed version: {installed}</p>".format(installed=versions.get("package_version", "unknown"))
description += "<p> fixed version: {fixed}</p>".format(fixed=versions.get("fixed_version", "unknown"))

link = vulnerability.get("link") if "link" in vulnerability else ""

vectors_v3 = vulnerability.get("vectors_v3", "")

score_v3 = vulnerability.get("score_v3", "")

published = datetime.fromtimestamp(int(vulnerability.get("published_timestamp", 0)))

vulnerability_id = vulnerability.get("name")

# there is nothing like short description, short name or title
package_names_combined = ','.join(sorted(set(package_names), key=str))
if len(package_names_combined) > 32:
package_names_combined = package_names_combined[-32:]

title = "{packages}: ({vuln})".format(packages=package_names_combined, vuln=vulnerability.get("name").upper())

# create the finding object
finding = Finding(
title=title,
test=test,
description=description,
severity=severity,
mitigation=mitigation,
impact="",
url=link,
cvssv3=vectors_v3,
cvssv3_score=score_v3,
publish_date=published,
)

finding.unsaved_vulnerability_ids = [vulnerability_id]

finding.unsaved_endpoints = []

nodes = vulnerability.get("nodes", [])
for node in nodes:
endpoint = Endpoint(
host=node.get("display_name", ""),
)
finding.unsaved_endpoints.append(endpoint)

return finding


# see neuvector/share/types.go
def convert_severity(severity):
if severity.lower() == "critical":
Expand All @@ -137,7 +227,7 @@ def get_label_for_scan_types(self, scan_type):
return NEUVECTOR_SCAN_NAME

def get_description_for_scan_types(self, scan_type):
return "JSON output of /v1/scan/{entity}/{id} endpoint."
return "JSON output of /v1/scan/{entity}/{id} endpoint (vulnerabilities of an endpoint). Or vulnerabilities of several assets (VulnerabilityAsset / ComplianceAsset)."

def get_findings(self, filename, test):
if filename is None:
Expand Down
80 changes: 77 additions & 3 deletions dojo/tools/neuvector_compliance/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
import json

from dojo.models import Finding
from dojo.models import Endpoint,Finding

Check failure on line 4 in dojo/tools/neuvector_compliance/parser.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (E231)

dojo/tools/neuvector_compliance/parser.py:4:33: E231 Missing whitespace after ','


NEUVECTOR_SCAN_NAME = "NeuVector (compliance)"
Expand Down Expand Up @@ -35,10 +35,10 @@ def get_items(tree, test):
# endpoints like /v1/scan/workload/{id}. otherwize, it is an export from
# /v1/host/{id}/compliance or similar. thus, we need to support items in a
# bit different leafs.
testsTree = None
testsTree = []
if "report" in tree:
testsTree = tree.get("report").get("checks", [])
else:
elif "items" in tree:
testsTree = tree.get("items", [])

for node in testsTree:
Expand All @@ -51,6 +51,23 @@ def get_items(tree, test):
)
unique_key = hashlib.md5(unique_key.encode("utf-8")).hexdigest()
items[unique_key] = item

# asset-style collection with compliance issues of several assets
testsAssetsTree = []
if "compliance_issues" in tree:
testsAssetsTree = tree.get("compliance_issues", [])
for node in testsAssetsTree:
item = get_asset_item(node, test)
unique_key = (
node.get("name")
+ node.get("category")
+ node.get("type")
+ node.get("level")
+ node.get("profile")
)
unique_key = hashlib.md5(unique_key.encode("utf-8")).hexdigest()
items[unique_key] = item

return list(items.values())


Expand Down Expand Up @@ -117,6 +134,63 @@ def get_item(node, test):
return finding


def get_asset_item(comp_issue, test):
test_name = comp_issue.get("name", "unknown")

test_description = comp_issue.get("description", "").rstrip()

test_severity = comp_issue.get("level", "unknown")

mitigation = comp_issue.get("remediation", "").rstrip()

category = comp_issue.get("category", "unknown")

test_profile = comp_issue.get("profile", "unknown")

full_description = "<p>{} ({}), {}:</p>".format(
test_name, category, test_profile
)
full_description += "<p>{}</p>".format(test_description)
full_description += "<p>Audit: {}</p>".format(test_severity)
full_description += "<p>Mitigation:</p>"
full_description += "<p> {}</p>".format(mitigation)

tags = comp_issue.get("tags", [])
if len(tags) > 0:
full_description += "<p>Applicable compliance standards: {tags}</p>".format(tags=','.join(sorted(set(tags), key=str)))

messages = comp_issue.get("message", [])
if len(messages) > 0:
full_description += "<p>Messages:</p>"
for m in messages:
full_description += "<p> {}</p>".format(str(m).rstrip())

finding = Finding(
title="{name} - {desc}".format(name=test_name, desc=test_description),
test=test,
description=full_description,
severity=convert_severity(test_severity),
mitigation=mitigation,
vuln_id_from_tool="{category}_{name}".format(category=category, name=test_name),
impact="",
static_finding=True,
dynamic_finding=False,
)

finding.unsaved_vulnerability_ids = []

finding.unsaved_endpoints = []

nodes = comp_issue.get("nodes", [])
for node in nodes:
endpoint = Endpoint(
host=node.get("display_name", ""),
)
finding.unsaved_endpoints.append(endpoint)

return finding


# see neuvector/share/clus_apis.go
def convert_severity(severity):
if severity.lower() == "high":
Expand Down

0 comments on commit 598fa43

Please sign in to comment.