Skip to content

Commit

Permalink
tools: neuvector_compliance: create compliance finding in the same ma…
Browse files Browse the repository at this point in the history
…nner as regular ones
  • Loading branch information
pna-nca committed Oct 24, 2024
1 parent d51645d commit 11970eb
Showing 1 changed file with 126 additions and 28 deletions.
154 changes: 126 additions & 28 deletions dojo/tools/neuvector_compliance/parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib
import json
import textwrap

from dojo.models import Endpoint,Finding

Expand Down Expand Up @@ -129,64 +130,161 @@ def get_item(node, test):
dynamic_finding=False,
)

COMPLIANCE_ASSET_FINDING_DESCRIPTION_TEMPLATE = """**Name:** {name}
**Details:**
{description}
**Audit:** {severity}
**Mitigation**: {mitigation}
**Applicable compliance standards**: {tags}
**Message:**
{message}
**Affected systems:**
{affected_systems}
"""

def get_asset_item(comp_issue, test):
test_name = comp_issue.get("name", "unknown")

def get_asset_item(comp_issue, test):
test_name = comp_issue.get("name", "unknown name")
test_description = comp_issue.get("description", "").rstrip()

test_severity = comp_issue.get("level", "unknown")
test_severity = comp_issue.get("level", "")
severity = convert_severity(test_severity)

mitigation = comp_issue.get("remediation", "").rstrip()

category = comp_issue.get("category", "unknown")

test_profile = comp_issue.get("profile", "unknown")
test_group = comp_issue.get("group", "unknown")

full_description = "<p>{} ({}), {}:</p>".format(
test_name, category, test_profile
)
full_description += "<p>{}</p>".format(test_description)
full_description += "<p>Audit: {}</p>".format(test_severity)
full_description += "<p>Mitigation:</p>"
full_description += "<p> {}</p>".format(mitigation)
vuln_id_from_tool = f"{category}_{test_name}"

tags = comp_issue.get("tags", [])
if len(tags) > 0:
full_description += "<p>Applicable compliance standards: {tags}</p>".format(tags=','.join(sorted(set(tags), key=str)))
test_profile = comp_issue.get("profile", "unknown profile")

tags = comp_issue.get("tags", [])
messages = comp_issue.get("message", [])
if len(messages) > 0:
full_description += "<p>Messages:</p>"
for m in messages:
full_description += "<p> {}</p>".format(str(m).rstrip())

nodes = comp_issue.get("nodes", [])
workloads = comp_issue.get("workloads", [])
images = comp_issue.get("images", [])
platforms = comp_issue.get("platforms", [])

# the same information is saved as Endpoint(s), however, DefectDojo
# Endpoint lacks many metadata fields, thus, difficult to read.
affected_systems = ""

for asset in nodes:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
affected_systems += f"*Node {display_name}*\n"
affected_systems += f" domains: {domains}\n"

for asset in platforms:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
affected_systems += f"*Platform {display_name}*\n"
affected_systems += f" domains: {domains}\n"

for asset in images:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
affected_systems += f"*Image {display_name}*\n"
affected_systems += f" domains: {domains}\n"

for asset in workloads:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
service = asset.get("service", "")
image = asset.get("image", "")
affected_systems += f"*Workload {display_name}*\n"
affected_systems += f" domains: {domains}\n"
affected_systems += f" service: {service}\n"
affected_systems += f" image: {image}\n"

full_description = COMPLIANCE_ASSET_FINDING_DESCRIPTION_TEMPLATE.format(
name=f"{test_name} ({category}), {test_profile}, {test_group}",
description=test_description,
severity=test_severity,
mitigation=mitigation,
tags=';'.join(tags),
message="\n".join(messages),
affected_systems=affected_systems,
)

finding = Finding(
title="{name} - {desc}".format(name=test_name, desc=test_description),
title=textwrap.shorten(f"{test_name} - {test_description}", width=64, placeholder="..."),
test=test,
description=full_description,
severity=convert_severity(test_severity),
severity=severity,
mitigation=mitigation,
vuln_id_from_tool="{category}_{name}".format(category=category, name=test_name),
vuln_id_from_tool=vuln_id_from_tool,
impact="",
static_finding=True,
dynamic_finding=False,
)

finding.unsaved_vulnerability_ids = []
finding.unsaved_vulnerability_ids = [vuln_id_from_tool]

finding.unsaved_endpoints = []

nodes = comp_issue.get("nodes", [])
for node in nodes:
endpoint = Endpoint(
host=node.get("display_name", ""),
)
finding.unsaved_endpoints.append(endpoint)
for asset in nodes:
endpoints = endpoints_from_asset("node", asset)
finding.unsaved_endpoints += endpoints

for asset in workloads:
endpoints = endpoints_from_asset("workload", asset)
finding.unsaved_endpoints += endpoints

for asset in images:
endpoints = endpoints_from_asset("image", asset)
finding.unsaved_endpoints += endpoints

for asset in platforms:
endpoints = endpoints_from_asset("platform", asset)
finding.unsaved_endpoints += endpoints

return finding


def endpoints_from_asset(kind, asset):
endpoints = []

# usually, there is only one namespace (domain, as NeuVector name it)
namespaces = asset.get("domains", [])

name = asset.get("display_name", "")

if kind == "workload":
# only workload assets have 'service' field
service = asset.get("service", "unknown_service")
name += f"/{service}"

# in principle, naming follows the approach chosen for trivy parser
endpoints.append(Endpoint(
# host needs to comply with domain name syntax, we just expect that
# there will be only one namespace
host='-'.join(namespaces),
# we abuse path to have as much details as possible
path=f"{kind}/{name}",
))

# if it is a workload and it has an associated image, add image as a
# separate endpoint
if kind == "workload" and asset.get("image", "") != "":
image = asset.get("image", "unknown_image")
# image value example:
# someregistry.com/bitnami/postgresql:11.21.0-debian-11-r58
artifact_and_tag = image.split("/")[-1]
# extracting only image name, without tag or digest
artifact_name = artifact_and_tag.split("@")[0]
artifact_name = artifact_name.split(":")[0]

endpoints.append(Endpoint(
host=f"{artifact_name}",
path=f"{image}",
))

return endpoints

# see neuvector/share/clus_apis.go
def convert_severity(severity):
if severity.lower() == "high":
Expand Down

0 comments on commit 11970eb

Please sign in to comment.