From c9df8dfbf4f5547fbabdbcd4c0fb1722e4e5d64f Mon Sep 17 00:00:00 2001 From: Fede Barcelona Date: Thu, 19 Nov 2020 18:33:00 +0100 Subject: [PATCH] feat: Add get_image_scanning_results method to SdScanningClient (#171) This method will allow to retrieve the scanning results for all the policies, or just the specified one. --- sdcclient/_scanning.py | 80 +++++++++++++++++++ .../secure/scanning/policy_evaluation_spec.py | 60 ++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 specs/secure/scanning/policy_evaluation_spec.py diff --git a/sdcclient/_scanning.py b/sdcclient/_scanning.py index f9cfad28..93f32c30 100644 --- a/sdcclient/_scanning.py +++ b/sdcclient/_scanning.py @@ -2,8 +2,10 @@ import json import re import time +from datetime import datetime from warnings import warn +from requests.exceptions import RetryError from requests_toolbelt.multipart.encoder import MultipartEncoder try: @@ -1224,3 +1226,81 @@ def download_cve_report_csv(self, vuln_type="os", scope_type="static"): return [False, self.lasterr] return [True, res.content.decode("utf-8")] + + def get_image_scanning_results(self, image_name, policy_id=None): + ''' + Args: + image_name (str): Image name to retrieve the scanning results from + policy_id (str): Policy ID to check against. If not specified, will check against all policies. + + Returns: + A tuple of (bool, str). + The first parameter, if true, means that the result is correct, while + if false, means that there's been an error. The second parameter + will hold the response of the API call. + ''' + try: + ok, res = self.get_image(image_name) + if not ok: + return ok, res + + image_digest = res[0]["imageDigest"] + image_tag = res[0]["image_detail"][0]["fulltag"] + except RetryError: + return [False, "could not retrieve image digest for the given image name, " + "ensure that the image has been scanned"] + + url = f"{self.url}/api/scanning/v1/images/{image_digest}/policyEvaluation" + params = { + "tag": image_tag, + } + + res = self.http.get(url, headers=self.hdrs, params=params, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + json_res = res.json() + + result = { + "image_digest": json_res["imageDigest"], + "image_id": json_res["imageId"], + "status": json_res["status"], + "image_tag": image_tag, + "total_stop": json_res["nStop"], + "total_warn": json_res["nWarn"], + "last_evaluation": datetime.utcfromtimestamp(json_res["at"]), + "policy_id": "*", + "policy_name": "All policies", + "warn_results": [], + "stop_results": [] + } + + if policy_id: + policy_results = [result for result in json_res["results"] if result["policyId"] == policy_id] + if policy_results: + filtered_result_by_policy_id = policy_results[0] + result["total_stop"] = filtered_result_by_policy_id["nStop"] + result["total_warn"] = filtered_result_by_policy_id["nWarn"] + result["warn_results"] = [rule_result["checkOutput"] + for gate_result in filtered_result_by_policy_id["gateResults"] + for rule_result in gate_result["ruleResults"] + if rule_result["gateAction"] == "warn"] + result["stop_results"] = [rule_result["checkOutput"] + for gate_result in filtered_result_by_policy_id["gateResults"] + for rule_result in gate_result["ruleResults"] + if rule_result["gateAction"] == "stop"] + else: + return [False, "the specified policy ID doesn't exist"] + else: + result["warn_results"] = [rule_result["checkOutput"] + for result in json_res["results"] + for gate_result in result["gateResults"] + for rule_result in gate_result["ruleResults"] + if rule_result["gateAction"] == "warn"] + result["stop_results"] = [rule_result["checkOutput"] + for result in json_res["results"] + for gate_result in result["gateResults"] + for rule_result in gate_result["ruleResults"] + if rule_result["gateAction"] == "stop"] + + return [True, result] diff --git a/specs/secure/scanning/policy_evaluation_spec.py b/specs/secure/scanning/policy_evaluation_spec.py new file mode 100644 index 00000000..2010589b --- /dev/null +++ b/specs/secure/scanning/policy_evaluation_spec.py @@ -0,0 +1,60 @@ +import os +from datetime import datetime + +from expects import equal, expect, be_empty, be_above_or_equal, be_an, have_keys, not_ +from mamba import before, context, description, it + +from sdcclient import SdScanningClient +from specs import be_successful_api_call + +with description("Policy Evaluation", "integration") as self: + with before.all: + self.client = SdScanningClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + token=os.getenv("SDC_SECURE_TOKEN")) + + with it("is able to retrieve the results for all the policies"): + image_name = "alpine:latest" + ok, res = self.client.get_image_scanning_results(image_name) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to( + have_keys("image_digest", "image_id", "stop_results", + total_warn=be_above_or_equal(0), total_stop=be_above_or_equal(0), + last_evaluation=be_an(datetime), + status="pass", image_tag="docker.io/alpine:latest", + policy_id="*", policy_name="All policies", + warn_results=not_(be_empty)) + ) + + with it("is able to retrieve the results for the default policy"): + image_name = "alpine:latest" + policy_id = "default" + ok, res = self.client.get_image_scanning_results(image_name, policy_id) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to( + have_keys("image_digest", "image_id", "stop_results", + total_warn=be_above_or_equal(0), total_stop=be_above_or_equal(0), + last_evaluation=be_an(datetime), + status="pass", image_tag="docker.io/alpine:latest", + policy_id="*", policy_name="All policies", + warn_results=not_(be_empty)) + ) + + with context("but the image has not been scanned yet"): + with it("returns an error saying that the image has not been found"): + image_name = "unknown_image" + ok, res = self.client.get_image_scanning_results(image_name) + + expect((ok, res)).to_not(be_successful_api_call) + expect(res).to(equal("could not retrieve image digest for the given image name, " + "ensure that the image has been scanned")) + + with context("but the provided policy id does not exist"): + with it("returns an error saying that the policy id is not found"): + image_name = "alpine" + policy_id = "unknown_policy_id" + ok, res = self.client.get_image_scanning_results(image_name, policy_id) + + expect((ok, res)).to_not(be_successful_api_call) + expect(res).to(equal("the specified policy ID doesn't exist"))