Skip to content

Commit

Permalink
add support for windows 10 for aiohttp
Browse files Browse the repository at this point in the history
  • Loading branch information
adam varsano committed Sep 1, 2024
1 parent 4aeeeba commit b3ebd09
Showing 1 changed file with 58 additions and 27 deletions.
85 changes: 58 additions & 27 deletions checkov/common/bridgecrew/platform_integration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import asyncio
import json
import logging
import os.path
import re
import sys
import uuid
from collections import namedtuple
from concurrent import futures
Expand Down Expand Up @@ -37,7 +39,8 @@
enrich_and_persist_checks_metadata, checkov_results_prefix, persist_run_metadata, _put_json_object, \
persist_graphs, persist_resource_subgraph_maps, persist_reachability_results, \
persist_multiple_logs_stream
from checkov.common.models.consts import SAST_SUPPORTED_FILE_EXTENSIONS, SUPPORTED_FILE_EXTENSIONS, SUPPORTED_FILES, SCANNABLE_PACKAGE_FILES
from checkov.common.models.consts import SAST_SUPPORTED_FILE_EXTENSIONS, SUPPORTED_FILE_EXTENSIONS, SUPPORTED_FILES, \
SCANNABLE_PACKAGE_FILES
from checkov.common.runners.base_runner import filter_ignored_paths
from checkov.common.sast.consts import SastLanguages, CDK_FRAMEWORK_PREFIX
from checkov.common.typing import _CicdDetails, LibraryGraph
Expand Down Expand Up @@ -188,7 +191,8 @@ def init_instance(self, platform_integration_data: dict[str, Any]) -> None:
self.use_s3_integration = platform_integration_data["use_s3_integration"]
self.setup_api_urls()
# 'mypy' doesn't like, when you try to override an instance method
self.get_auth_token = MethodType(lambda _=None: platform_integration_data["get_auth_token"], self) # type:ignore[method-assign]
self.get_auth_token = MethodType(lambda _=None: platform_integration_data["get_auth_token"],
self) # type:ignore[method-assign]

def generate_instance_data(self) -> dict[str, Any]:
"""This output is used to re-initialize the instance and should be kept in sync with 'init_instance()'"""
Expand Down Expand Up @@ -330,7 +334,8 @@ def setup_http_manager(self, ca_certificate: str | None = None, no_cert_verify:
os.environ['https_proxy'],
cert_reqs=cert_reqs,
ca_certs=ca_certificate,
proxy_headers=urllib3.make_headers(proxy_basic_auth=parsed_url.auth), # type:ignore[no-untyped-call]
proxy_headers=urllib3.make_headers(proxy_basic_auth=parsed_url.auth),
# type:ignore[no-untyped-call]
timeout=self.http_timeout,
retries=self.http_retry,
)
Expand All @@ -349,7 +354,8 @@ def setup_http_manager(self, ca_certificate: str | None = None, no_cert_verify:
self.http = urllib3.ProxyManager(
os.environ['https_proxy'],
cert_reqs=cert_reqs,
proxy_headers=urllib3.make_headers(proxy_basic_auth=parsed_url.auth), # type:ignore[no-untyped-call]
proxy_headers=urllib3.make_headers(proxy_basic_auth=parsed_url.auth),
# type:ignore[no-untyped-call]
timeout=self.http_timeout,
retries=self.http_retry,
)
Expand All @@ -359,8 +365,15 @@ def setup_http_manager(self, ca_certificate: str | None = None, no_cert_verify:
timeout=self.http_timeout,
retries=self.http_retry,
)
self.config_http_manager()
logging.debug('Successfully set up HTTP manager')

@staticmethod
def config_http_manager():
# on windows aiodns needs SelectorEventLoop
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

def setup_bridgecrew_credentials(
self,
repo_id: str,
Expand Down Expand Up @@ -514,8 +527,9 @@ def get_s3_role(self, repo_id: str) -> tuple[str, str, dict[str, Any]] | tuple[N
tries += 1
response = self._get_s3_creds(repo_id, token)
else:
logging.error('Checkov got an unexpected error that may be due to backend issues. The scan will continue, '
'but results will not be sent to the platform. Please contact support for assistance.')
logging.error(
'Checkov got an unexpected error that may be due to backend issues. The scan will continue, '
'but results will not be sent to the platform. Please contact support for assistance.')
logging.error(f'Error from platform: {response.get("message") or response.get("Message")}')
self.s3_setup_failed = True
return None, None, response
Expand Down Expand Up @@ -592,7 +606,8 @@ def persist_repository(
continue
full_file_path = os.path.join(root_path, file_path)
relative_file_path = os.path.relpath(full_file_path, root_dir)
if file_extension in SUPPORTED_FILE_EXTENSIONS or file_path in SUPPORTED_FILES or is_dockerfile(file_path):
if file_extension in SUPPORTED_FILE_EXTENSIONS or file_path in SUPPORTED_FILES or is_dockerfile(
file_path):
files_to_persist.append(FileToPersist(full_file_path, relative_file_path))
if sast_languages:
for framwork in sast_languages:
Expand Down Expand Up @@ -629,7 +644,8 @@ def adjust_sast_match_location_path(self, match: Match) -> None:
match.location.path = match.location.path.replace(os.path.abspath(dir), self.repo_path) # type: ignore
if match.metadata.code_locations:
for code_location in match.metadata.code_locations:
code_location.path = code_location.path.replace(os.path.abspath(dir), self.repo_path) # type: ignore
code_location.path = code_location.path.replace(os.path.abspath(dir),
self.repo_path) # type: ignore

if match.metadata.taint_mode and match.metadata.taint_mode.data_flow:
for df in match.metadata.taint_mode.data_flow:
Expand All @@ -640,10 +656,12 @@ def adjust_sast_match_location_path(self, match: Match) -> None:
for file in self.scan_file:
if match.location.path == os.path.abspath(file):
file_dir = '/'.join(match.location.path.split('/')[0:-1])
match.location.path = match.location.path.replace(os.path.abspath(file_dir), self.repo_path) # type: ignore
match.location.path = match.location.path.replace(os.path.abspath(file_dir),
self.repo_path) # type: ignore
if match.metadata.code_locations:
for code_location in match.metadata.code_locations:
code_location.path = code_location.path.replace(os.path.abspath(file_dir), self.repo_path) # type: ignore
code_location.path = code_location.path.replace(os.path.abspath(file_dir),
self.repo_path) # type: ignore

if match.metadata.taint_mode and match.metadata.taint_mode.data_flow:
for df in match.metadata.taint_mode.data_flow:
Expand Down Expand Up @@ -780,7 +798,8 @@ def persist_reachability_scan_results(self, reachability_report: Optional[Dict[s
for lang, report in reachability_report.items():
persist_reachability_results(f'sast_{lang}', {lang: report}, self.s3_client, self.bucket, self.repo_path)

def persist_image_scan_results(self, report: dict[str, Any] | None, file_path: str, image_name: str, branch: str) -> None:
def persist_image_scan_results(self, report: dict[str, Any] | None, file_path: str, image_name: str,
branch: str) -> None:
if not self.s3_client:
logging.error("S3 upload was not correctly initialized")
return
Expand Down Expand Up @@ -832,7 +851,8 @@ def persist_run_metadata(self, run_metadata: dict[str, str | list[str]]) -> None
return
persist_run_metadata(run_metadata, self.s3_client, self.bucket, self.repo_path, True)
if self.support_bucket and self.support_repo_path:
logging.debug(f'Also uploading run_metadata.json to support location: {self.support_bucket}/{self.support_repo_path}')
logging.debug(
f'Also uploading run_metadata.json to support location: {self.support_bucket}/{self.support_repo_path}')
persist_run_metadata(run_metadata, self.s3_client, self.support_bucket, self.support_repo_path, False)

def persist_all_logs_streams(self, logs_streams: Dict[str, StringIO]) -> None:
Expand All @@ -845,7 +865,8 @@ def persist_all_logs_streams(self, logs_streams: Dict[str, StringIO]) -> None:

persist_multiple_logs_stream(logs_streams, self.s3_client, self.support_bucket, self.support_repo_path)

def persist_graphs(self, graphs: dict[str, list[tuple[LibraryGraph, Optional[str]]]], absolute_root_folder: str = '') -> None:
def persist_graphs(self, graphs: dict[str, list[tuple[LibraryGraph, Optional[str]]]],
absolute_root_folder: str = '') -> None:
if not self.use_s3_integration or not self.s3_client or self.s3_setup_failed:
return
if not self.bucket or not self.repo_path:
Expand All @@ -860,7 +881,8 @@ def persist_resource_subgraph_maps(self, resource_subgraph_maps: dict[str, dict[
if not self.bucket or not self.repo_path:
logging.error(f"Something went wrong: bucket {self.bucket}, repo path {self.repo_path}")
return
persist_resource_subgraph_maps(resource_subgraph_maps, self.s3_client, self.bucket, self.repo_path, self.persist_graphs_timeout)
persist_resource_subgraph_maps(resource_subgraph_maps, self.s3_client, self.bucket, self.repo_path,
self.persist_graphs_timeout)

def commit_repository(self, branch: str) -> str | None:
"""
Expand All @@ -886,7 +908,8 @@ def commit_repository(self, branch: str) -> str | None:
return None

logging.debug(f'Submitting finalize upload request to {self.integrations_api_url}')
request = self.http.request("PUT", f"{self.integrations_api_url}?source={self.bc_source.name}", # type:ignore[no-untyped-call]
request = self.http.request("PUT", f"{self.integrations_api_url}?source={self.bc_source.name}",
# type:ignore[no-untyped-call]
body=json.dumps(
{"path": self.repo_path, "branch": branch,
"to_branch": CI_METADATA_EXTRACTOR.to_branch,
Expand Down Expand Up @@ -916,7 +939,8 @@ def commit_repository(self, branch: str) -> str | None:
self.s3_setup_failed = True
except JSONDecodeError:
if request:
logging.warning(f"Response (status: {request.status}) of {self.integrations_api_url}: {request.data.decode('utf8')}") # danger:ignore - we won't be here if the response contains valid data
logging.warning(
f"Response (status: {request.status}) of {self.integrations_api_url}: {request.data.decode('utf8')}") # danger:ignore - we won't be here if the response contains valid data
logging.error(f"Response of {self.integrations_api_url} is not a valid JSON", exc_info=True)
self.s3_setup_failed = True
finally:
Expand All @@ -932,7 +956,8 @@ def commit_repository(self, branch: str) -> str | None:
try_num += 1
sleep(SLEEP_SECONDS)
else:
logging.error(f"Failed to finalize repository {self.repo_id} in the platform with the following error:\n{response}")
logging.error(
f"Failed to finalize repository {self.repo_id} in the platform with the following error:\n{response}")
self.s3_setup_failed = True

return None
Expand Down Expand Up @@ -967,7 +992,8 @@ def _persist_file(self, full_file_path: str, s3_file_key: str) -> None:
sleep(SLEEP_SECONDS)
curr_try += 1
else:
logging.error(f"failed to persist file {full_file_path} into S3 bucket {self.bucket}", exc_info=True)
logging.error(f"failed to persist file {full_file_path} into S3 bucket {self.bucket}",
exc_info=True)
logging.debug(f"file size of {full_file_path} is {os.stat(full_file_path).st_size} bytes")
raise
except Exception:
Expand Down Expand Up @@ -1043,10 +1069,11 @@ def get_customer_run_config(self) -> None:

logging.debug(f"Got customer run config from {platform_type} platform")
except Exception as e:
logging.warning(f"An unexpected error occurred getting the run configuration from {self.platform_run_config_url} "
"after multiple retries. Please verify your API key and Prisma API URL, and retry. If the "
"problem persists, please enable debug logs and contact support. The error is: "
f"{e}", exc_info=True)
logging.warning(
f"An unexpected error occurred getting the run configuration from {self.platform_run_config_url} "
"after multiple retries. Please verify your API key and Prisma API URL, and retry. If the "
"problem persists, please enable debug logs and contact support. The error is: "
f"{e}", exc_info=True)
raise

def get_reachability_run_config(self) -> Union[Dict[str, Any], None]:
Expand Down Expand Up @@ -1182,7 +1209,8 @@ def get_prisma_policies_for_filter(self, policy_filter: str) -> dict[Any, Any] |
except Exception:
response_message = f': {request.status} - {request.reason}' if request else ''
logging.warning(
f"Failed to get prisma build policy metadata from {self.prisma_policies_url}{response_message}", exc_info=True)
f"Failed to get prisma build policy metadata from {self.prisma_policies_url}{response_message}",
exc_info=True)
return filtered_policies

@staticmethod
Expand Down Expand Up @@ -1219,11 +1247,13 @@ def get_prisma_policy_filters(self) -> Dict[str, Dict[str, Any]]:
except Exception:
response_message = f': {request.status} - {request.reason}' if request else ''
logging.warning(
f"Failed to get prisma build policy metadata from {self.prisma_policy_filters_url}{response_message}", exc_info=True)
f"Failed to get prisma build policy metadata from {self.prisma_policy_filters_url}{response_message}",
exc_info=True)
return {}

@staticmethod
def is_valid_policy_filter(policy_filter: list[tuple[str, str]], valid_filters: dict[str, dict[str, Any]] | None = None) -> bool:
def is_valid_policy_filter(policy_filter: list[tuple[str, str]],
valid_filters: dict[str, dict[str, Any]] | None = None) -> bool:
"""
Validates only the filter names
"""
Expand Down Expand Up @@ -1273,8 +1303,9 @@ def get_public_run_config(self) -> None:
platform_type = PRISMA_PLATFORM if self.is_prisma_integration() else BRIDGECREW_PLATFORM
logging.debug(f"Got checkov mappings and guidelines from {platform_type} platform")
except Exception:
logging.warning(f"Failed to get the checkov mappings and guidelines from {self.guidelines_api_url}. Skips using BC_* IDs will not work.",
exc_info=True)
logging.warning(
f"Failed to get the checkov mappings and guidelines from {self.guidelines_api_url}. Skips using BC_* IDs will not work.",
exc_info=True)

def get_report_to_platform(self, args: argparse.Namespace, scan_reports: list[Report]) -> None:
if self.bc_api_key:
Expand Down

0 comments on commit b3ebd09

Please sign in to comment.