Skip to content

Commit

Permalink
Add in_toto output and its tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SaraWeinberg1234 committed Jul 31, 2024
1 parent e3bd7ad commit 0d14fe1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 142 deletions.
150 changes: 39 additions & 111 deletions checkov/common/output/in_toto_output.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,34 @@
# import json
# from datetime import datetime, timezone
# from typing import Any, Dict, List, Union
# from checkov.common.output.report import Report
# import os
#
#
# class InTotoOutput:
# def __init__(self, repo_id: Union[str, None], reports: List[Report]):
# self.repo_id = f"{repo_id}/" if repo_id else ""
# self.reports = reports
#
# def generate_output(self) -> Dict[str, Any]:
# scan_start_time = datetime.now(timezone.utc).isoformat()
#
# in_toto_data = {
# "_type": "https://in-toto.io/Statement/v1",
# "subject": [
# {
# "name": "",
# "digest": {
# "sha256": "fe4fe40ac7250263c5dbe1cf3138912f3f416140aa248637a60d65fe22c47da4"
# }
# }
# ],
# "predicateType": "https://in-toto.io/attestation/vulns/v0.1",
# "predicate": {
# "invocation": {
# "parameters": [],
# "uri": "",
# "event_id": "",
# "builder.id": ""
# },
# "scanner": {
# "uri": "",
# "version": "",
# "db": {
# "uri": "",
# "version": "",
# "lastUpdate": ""
# },
# "result": [
# {
# "id": "",
# "severity": [
# {
# "method": "nvd",
# "score": ""
# },
# {
# "method": "cvss_score",
# "score": ""
# }
# ],
# "annotations": [],
# "scanStartedOn": ""
# }
# ]
# },
# "metadata": {
# "scanStartedOn": scan_start_time,
# "scanFinishedOn": ""
# }
# }
# }
#
# for report in self.reports:
# for check in report.failed_checks:
# in_toto_data["predicate"]["invocation"]["uri"] = "https://github.com/developer-guy/alpine/actions/runs/1071875574"
# in_toto_data["predicate"]["invocation"]["event_id"] = "1071875574"
# in_toto_data["predicate"]["invocation"]["builder.id"] = "GitHub Actions"
# in_toto_data["predicate"]["scanner"]["uri"] = "pkg:github/aquasecurity/trivy@244fd47e07d1004f0aed9"
# in_toto_data["predicate"]["scanner"]["version"] = "0.19.2"
# in_toto_data["predicate"]["scanner"]["db"]["uri"] = "pkg:github/aquasecurity/trivy-db/commit/4c76bb580b2736d67751410fa4ab66d2b6b9b27d"
# in_toto_data["predicate"]["scanner"]["db"]["version"] = "v1-2021080612"
# in_toto_data["predicate"]["scanner"]["db"]["lastUpdate"] = "2021-08-06T17:45:50.52Z"
# in_toto_data["subject"][0]["name"] = os.path.basename(check.file_path)
#
# result_data = {
# "id": check.check_id,
# "severity": [
# {
# "method": "nvd",
# "score": check.severity
# }
# ],
# "annotations": [{"key": "description", "value": check.check_name}],
# "scanStartedOn": scan_start_time
# }
#
# in_toto_data["predicate"]["scanner"]["result"][0] = result_data
#
# scan_finish_time = datetime.now(timezone.utc).isoformat()
# in_toto_data["predicate"]["metadata"]["scanFinishedOn"] = scan_finish_time
#
# return in_toto_data
#
# @staticmethod
# def write_output(output_path: str, in_toto_data: Dict[str, Any]) -> None:
# with open(output_path, "w") as f:
# json.dump(in_toto_data, f, indent=4)
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Union
from checkov.common.output.report import Report
import os
import hashlib


class InTotoOutput:
def __init__(self, repo_id: Union[str, None], reports: List[Report]):
self.repo_id = f"{repo_id}/" if repo_id else ""
self.reports = reports

@staticmethod
def calculate_sha256(file_path: str) -> str:
with open(file_path, "rb") as f:
file_hash = hashlib.sha256()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()

def generate_output(self) -> Dict[str, Any]:
scan_start_time = datetime.now(timezone.utc).isoformat()

in_toto_data = {
in_toto_data: Dict[str, Any] = {
"_type": "https://in-toto.io/Statement/v1",
"subject": [
{
"name": "",
"digest": {
"sha256": "fe4fe40ac7250263c5dbe1cf3138912f3f416140aa248637a60d65fe22c47da4"
"sha256": ""
}
}
],
Expand All @@ -140,7 +48,23 @@ def generate_output(self) -> Dict[str, Any]:
"version": "",
"lastUpdate": ""
},
"result": []
"result": [
{
"id": "",
"severity": [
{
"method": "nvd",
"score": ""
},
{
"method": "cvss_score",
"score": ""
}
],
"annotations": [],
"scanStartedOn": ""
}
]
},
"metadata": {
"scanStartedOn": scan_start_time,
Expand All @@ -151,29 +75,33 @@ def generate_output(self) -> Dict[str, Any]:

for report in self.reports:
for check in report.failed_checks:
in_toto_data["predicate"]["invocation"]["uri"] = "https://github.com/developer-guy/alpine/actions/runs/1071875574"
file_path = check.file_path
sha256_hash = self.calculate_sha256(file_path)

in_toto_data["predicate"]["invocation"]["uri"] = ""
in_toto_data["predicate"]["invocation"]["event_id"] = "1071875574"
in_toto_data["predicate"]["invocation"]["builder.id"] = "GitHub Actions"
in_toto_data["predicate"]["scanner"]["uri"] = "pkg:github/aquasecurity/trivy@244fd47e07d1004f0aed9"
in_toto_data["predicate"]["invocation"]["builder.id"] = ""
in_toto_data["predicate"]["scanner"]["uri"] = ""
in_toto_data["predicate"]["scanner"]["version"] = "0.19.2"
in_toto_data["predicate"]["scanner"]["db"]["uri"] = "pkg:github/aquasecurity/trivy-db/commit/4c76bb580b2736d67751410fa4ab66d2b6b9b27d"
in_toto_data["predicate"]["scanner"]["db"]["version"] = "v1-2021080612"
in_toto_data["predicate"]["scanner"]["db"]["lastUpdate"] = "2021-08-06T17:45:50.52Z"
in_toto_data["subject"][0]["name"] = os.path.basename(check.file_path)
in_toto_data["subject"][0]["name"] = os.path.basename(file_path)
in_toto_data["subject"][0]["digest"]["sha256"] = sha256_hash

result_data = {
result_data: Dict[str, Any] = {
"id": check.check_id,
"severity": [
{
"method": "nvd",
"method": "vendor",
"score": check.severity
}
],
"annotations": [{"key": "description", "value": check.check_name}],
"scanStartedOn": scan_start_time
}

in_toto_data["predicate"]["scanner"]["result"].append(result_data)
in_toto_data["predicate"]["scanner"]["result"][0] = result_data

scan_finish_time = datetime.now(timezone.utc).isoformat()
in_toto_data["predicate"]["metadata"]["scanFinishedOn"] = scan_finish_time
Expand Down
70 changes: 39 additions & 31 deletions tests/common/output/test_in_toto_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,81 @@
from unittest.mock import Mock
import json
from checkov.common.output.in_toto_output import InTotoOutput
import tempfile
import os
import hashlib


def calculate_sha256(file_path):
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()


@pytest.fixture
def mock_report():
temp_dir = tempfile.mkdtemp()
temp_file_path = os.path.join(temp_dir, "main.tf")
with open(temp_file_path, "w") as f:
f.write("dummy content")

report = Mock()
report.failed_checks = [

Mock(
file_path="/path/to/main.tf",
check_id="CKV_AWS_6",
check_name="Ensure that S3 bucket has a Public Access block",
severity="HIGH",
),
Mock(
file_path="/path/to/main.tf",
file_path=temp_file_path,
check_id="CKV_AWS_18",
check_name="Ensure the S3 bucket has access logging enabled",
severity="MEDIUM",
),
]
return report
return report, temp_file_path


def test_generate_output(mock_report):
in_toto_output = InTotoOutput(repo_id="my_repo", reports=[mock_report])
report, temp_file_path = mock_report
in_toto_output = InTotoOutput(repo_id="my_repo", reports=[report])
result = in_toto_output.generate_output()
expected_sha256 = calculate_sha256(temp_file_path)

assert result["_type"] == "https://in-toto.io/Statement/v1"
assert result["predicateType"] == "https://in-toto.io/attestation/vulns/v0.1"
assert result["subject"][0]["digest"][
"sha256"] == "fe4fe40ac7250263c5dbe1cf3138912f3f416140aa248637a60d65fe22c47da4"
assert result["predicate"]["invocation"]["uri"] == "https://github.com/developer-guy/alpine/actions/runs/1071875574"
assert result["subject"][0]["digest"]["sha256"] == expected_sha256
assert result["predicate"]["invocation"]["uri"] == ""
assert result["predicate"]["invocation"]["event_id"] == "1071875574"
assert result["predicate"]["invocation"]["builder.id"] == "GitHub Actions"
assert result["predicate"]["scanner"]["uri"] == "pkg:github/aquasecurity/trivy@244fd47e07d1004f0aed9"
assert result["predicate"]["invocation"]["builder.id"] == ""
assert result["predicate"]["scanner"]["uri"] == ""
assert result["predicate"]["scanner"]["version"] == "0.19.2"
assert result["predicate"]["scanner"]["db"][
"uri"] == "pkg:github/aquasecurity/trivy-db/commit/4c76bb580b2736d67751410fa4ab66d2b6b9b27d"
assert result["predicate"]["scanner"]["db"]["uri"] == "pkg:github/aquasecurity/trivy-db/commit/4c76bb580b2736d67751410fa4ab66d2b6b9b27d"
assert result["predicate"]["scanner"]["db"]["version"] == "v1-2021080612"
assert result["predicate"]["scanner"]["db"]["lastUpdate"] == "2021-08-06T17:45:50.52Z"
assert result["predicate"]["scanner"]["result"][0]["id"] == "CKV_AWS_6"
assert result["predicate"]["scanner"]["result"][0]["severity"][0]["score"] == "HIGH"
assert result["predicate"]["scanner"]["result"][1]["id"] == "CKV_AWS_18"
assert result["predicate"]["scanner"]["result"][1]["severity"][0]["score"] == "MEDIUM"
assert result["predicate"]["scanner"]["result"][0]["id"] == "CKV_AWS_18"
assert result["predicate"]["scanner"]["result"][0]["severity"][0]["score"] == "MEDIUM"


def test_write_output(mock_report, tmpdir):
in_toto_output = InTotoOutput(repo_id="my_repo", reports=[mock_report])
report, temp_file_path = mock_report
in_toto_output = InTotoOutput(repo_id="my_repo", reports=[report])
output_path = tmpdir.join("test_output.json")
result = in_toto_output.generate_output()
in_toto_output.write_output(str(output_path), result)
expected_sha256 = calculate_sha256(temp_file_path)

with open(output_path, "r") as f:
data = json.load(f)

assert data["_type"] == "https://in-toto.io/Statement/v1"
assert data["predicateType"] == "https://in-toto.io/attestation/vulns/v0.1"
assert data["subject"][0]["digest"]["sha256"] == "fe4fe40ac7250263c5dbe1cf3138912f3f416140aa248637a60d65fe22c47da4"
assert data["predicate"]["invocation"]["uri"] == "https://github.com/developer-guy/alpine/actions/runs/1071875574"
assert data["subject"][0]["digest"]["sha256"] == expected_sha256
assert data["predicate"]["invocation"]["uri"] == ""
assert data["predicate"]["invocation"]["event_id"] == "1071875574"
assert data["predicate"]["invocation"]["builder.id"] == "GitHub Actions"
assert data["predicate"]["scanner"]["uri"] == "pkg:github/aquasecurity/trivy@244fd47e07d1004f0aed9"
assert data["predicate"]["invocation"]["builder.id"] == ""
assert data["predicate"]["scanner"]["uri"] == ""
assert data["predicate"]["scanner"]["version"] == "0.19.2"
assert data["predicate"]["scanner"]["db"][
"uri"] == "pkg:github/aquasecurity/trivy-db/commit/4c76bb580b2736d67751410fa4ab66d2b6b9b27d"
assert data["predicate"]["scanner"]["db"]["uri"] == "pkg:github/aquasecurity/trivy-db/commit/4c76bb580b2736d67751410fa4ab66d2b6b9b27d"
assert data["predicate"]["scanner"]["db"]["version"] == "v1-2021080612"
assert data["predicate"]["scanner"]["db"]["lastUpdate"] == "2021-08-06T17:45:50.52Z"
assert data["predicate"]["scanner"]["result"][0]["id"] == "CKV_AWS_6"
assert data["predicate"]["scanner"]["result"][0]["severity"][0]["score"] == "HIGH"
assert data["predicate"]["scanner"]["result"][1]["id"] == "CKV_AWS_18"
assert data["predicate"]["scanner"]["result"][1]["severity"][0]["score"] == "MEDIUM"
assert data["predicate"]["scanner"]["result"][0]["id"] == "CKV_AWS_18"
assert data["predicate"]["scanner"]["result"][0]["severity"][0]["score"] == "MEDIUM"

0 comments on commit 0d14fe1

Please sign in to comment.