From 11970ebf1273d1fa9acea92a7f5737ecb883e9dd Mon Sep 17 00:00:00 2001 From: Pavel Nakonechnyi Date: Thu, 24 Oct 2024 15:00:09 +0200 Subject: [PATCH] tools: neuvector_compliance: create compliance finding in the same manner as regular ones --- dojo/tools/neuvector_compliance/parser.py | 154 ++++++++++++++++++---- 1 file changed, 126 insertions(+), 28 deletions(-) diff --git a/dojo/tools/neuvector_compliance/parser.py b/dojo/tools/neuvector_compliance/parser.py index 05cfe29baa..cdae7a66c7 100644 --- a/dojo/tools/neuvector_compliance/parser.py +++ b/dojo/tools/neuvector_compliance/parser.py @@ -1,5 +1,6 @@ import hashlib import json +import textwrap from dojo.models import Endpoint,Finding @@ -129,64 +130,161 @@ def get_item(node, test): dynamic_finding=False, ) +COMPLIANCE_ASSET_FINDING_DESCRIPTION_TEMPLATE = """**Name:** {name} +**Details:** +{description} +**Audit:** {severity} +**Mitigation**: {mitigation} +**Applicable compliance standards**: {tags} +**Message:** +{message} +**Affected systems:** +{affected_systems} +""" -def get_asset_item(comp_issue, test): - test_name = comp_issue.get("name", "unknown") +def get_asset_item(comp_issue, test): + test_name = comp_issue.get("name", "unknown name") test_description = comp_issue.get("description", "").rstrip() - test_severity = comp_issue.get("level", "unknown") + test_severity = comp_issue.get("level", "") + severity = convert_severity(test_severity) mitigation = comp_issue.get("remediation", "").rstrip() category = comp_issue.get("category", "unknown") - test_profile = comp_issue.get("profile", "unknown") + test_group = comp_issue.get("group", "unknown") - full_description = "

{} ({}), {}:

".format( - test_name, category, test_profile - ) - full_description += "

{}

".format(test_description) - full_description += "

Audit: {}

".format(test_severity) - full_description += "

Mitigation:

" - full_description += "

{}

".format(mitigation) + vuln_id_from_tool = f"{category}_{test_name}" - tags = comp_issue.get("tags", []) - if len(tags) > 0: - full_description += "

Applicable compliance standards: {tags}

".format(tags=','.join(sorted(set(tags), key=str))) + test_profile = comp_issue.get("profile", "unknown profile") + tags = comp_issue.get("tags", []) messages = comp_issue.get("message", []) - if len(messages) > 0: - full_description += "

Messages:

" - for m in messages: - full_description += "

{}

".format(str(m).rstrip()) + + nodes = comp_issue.get("nodes", []) + workloads = comp_issue.get("workloads", []) + images = comp_issue.get("images", []) + platforms = comp_issue.get("platforms", []) + + # the same information is saved as Endpoint(s), however, DefectDojo + # Endpoint lacks many metadata fields, thus, difficult to read. + affected_systems = "" + + for asset in nodes: + display_name = asset.get("display_name", "") + domains = ','.join(asset.get("domains", [])) + affected_systems += f"*Node {display_name}*\n" + affected_systems += f" domains: {domains}\n" + + for asset in platforms: + display_name = asset.get("display_name", "") + domains = ','.join(asset.get("domains", [])) + affected_systems += f"*Platform {display_name}*\n" + affected_systems += f" domains: {domains}\n" + + for asset in images: + display_name = asset.get("display_name", "") + domains = ','.join(asset.get("domains", [])) + affected_systems += f"*Image {display_name}*\n" + affected_systems += f" domains: {domains}\n" + + for asset in workloads: + display_name = asset.get("display_name", "") + domains = ','.join(asset.get("domains", [])) + service = asset.get("service", "") + image = asset.get("image", "") + affected_systems += f"*Workload {display_name}*\n" + affected_systems += f" domains: {domains}\n" + affected_systems += f" service: {service}\n" + affected_systems += f" image: {image}\n" + + full_description = COMPLIANCE_ASSET_FINDING_DESCRIPTION_TEMPLATE.format( + name=f"{test_name} ({category}), {test_profile}, {test_group}", + description=test_description, + severity=test_severity, + mitigation=mitigation, + tags=';'.join(tags), + message="\n".join(messages), + affected_systems=affected_systems, + ) finding = Finding( - title="{name} - {desc}".format(name=test_name, desc=test_description), + title=textwrap.shorten(f"{test_name} - {test_description}", width=64, placeholder="..."), test=test, description=full_description, - severity=convert_severity(test_severity), + severity=severity, mitigation=mitigation, - vuln_id_from_tool="{category}_{name}".format(category=category, name=test_name), + vuln_id_from_tool=vuln_id_from_tool, impact="", static_finding=True, dynamic_finding=False, ) - finding.unsaved_vulnerability_ids = [] + finding.unsaved_vulnerability_ids = [vuln_id_from_tool] finding.unsaved_endpoints = [] - nodes = comp_issue.get("nodes", []) - for node in nodes: - endpoint = Endpoint( - host=node.get("display_name", ""), - ) - finding.unsaved_endpoints.append(endpoint) + for asset in nodes: + endpoints = endpoints_from_asset("node", asset) + finding.unsaved_endpoints += endpoints + + for asset in workloads: + endpoints = endpoints_from_asset("workload", asset) + finding.unsaved_endpoints += endpoints + + for asset in images: + endpoints = endpoints_from_asset("image", asset) + finding.unsaved_endpoints += endpoints + + for asset in platforms: + endpoints = endpoints_from_asset("platform", asset) + finding.unsaved_endpoints += endpoints return finding +def endpoints_from_asset(kind, asset): + endpoints = [] + + # usually, there is only one namespace (domain, as NeuVector name it) + namespaces = asset.get("domains", []) + + name = asset.get("display_name", "") + + if kind == "workload": + # only workload assets have 'service' field + service = asset.get("service", "unknown_service") + name += f"/{service}" + + # in principle, naming follows the approach chosen for trivy parser + endpoints.append(Endpoint( + # host needs to comply with domain name syntax, we just expect that + # there will be only one namespace + host='-'.join(namespaces), + # we abuse path to have as much details as possible + path=f"{kind}/{name}", + )) + + # if it is a workload and it has an associated image, add image as a + # separate endpoint + if kind == "workload" and asset.get("image", "") != "": + image = asset.get("image", "unknown_image") + # image value example: + # someregistry.com/bitnami/postgresql:11.21.0-debian-11-r58 + artifact_and_tag = image.split("/")[-1] + # extracting only image name, without tag or digest + artifact_name = artifact_and_tag.split("@")[0] + artifact_name = artifact_name.split(":")[0] + + endpoints.append(Endpoint( + host=f"{artifact_name}", + path=f"{image}", + )) + + return endpoints + # see neuvector/share/clus_apis.go def convert_severity(severity): if severity.lower() == "high":