diff --git a/checkov/ansible/utils.py b/checkov/ansible/utils.py index 67018ac2e3b..de3b0e5dff9 100644 --- a/checkov/ansible/utils.py +++ b/checkov/ansible/utils.py @@ -8,6 +8,7 @@ from checkov.ansible.graph_builder.graph_components.resource_types import ResourceType from checkov.common.parsers.yaml.parser import parse from checkov.common.util.consts import START_LINE, END_LINE +from checkov.common.util.file_utils import read_file_with_any_encoding from checkov.common.util.suppression import collect_suppressions_for_context TASK_NAME_PATTERN = re.compile(r"^\s*-\s+name:\s+", re.MULTILINE) @@ -76,7 +77,7 @@ def get_relevant_file_content(file_path: str | Path) -> str | None: if not str(file_path).endswith((".yaml", ".yml")): return None - content = Path(file_path).read_text() + content = read_file_with_any_encoding(file_path=file_path) match_task_name = re.search(TASK_NAME_PATTERN, content) if match_task_name: # there are more files, which belong to an ansible playbook, diff --git a/checkov/argo_workflows/runner.py b/checkov/argo_workflows/runner.py index 7ff99b0d9b7..55c8be04ce0 100644 --- a/checkov/argo_workflows/runner.py +++ b/checkov/argo_workflows/runner.py @@ -1,11 +1,11 @@ from __future__ import annotations import re -from pathlib import Path from typing import TYPE_CHECKING, Any from checkov.common.images.image_referencer import ImageReferencer, Image from checkov.common.output.report import CheckType +from checkov.common.util.file_utils import read_file_with_any_encoding from checkov.yaml_doc.runner import Runner as YamlRunner # Import of the checks registry for a specific resource type @@ -44,7 +44,7 @@ def _get_workflow_file_content(self, file_path: str) -> str | None: if not file_path.endswith((".yaml", ".yml")): return None - content = Path(file_path).read_text() + content = read_file_with_any_encoding(file_path=file_path) match_api = re.search(API_VERSION_PATTERN, content) if match_api: match_kind = re.search(KIND_PATTERN, content) diff --git a/checkov/arm/parser/parser.py b/checkov/arm/parser/parser.py index 6adac088cef..04c2858dc90 100644 --- a/checkov/arm/parser/parser.py +++ b/checkov/arm/parser/parser.py @@ -4,13 +4,12 @@ from pathlib import Path from typing import Any -from charset_normalizer import from_path from yaml.scanner import ScannerError from yaml import YAMLError from checkov.common.parsers.json import parse as json_parse from checkov.common.parsers.yaml import loader - +from checkov.common.util.file_utils import read_file_with_any_encoding LOGGER = logging.getLogger(__name__) @@ -57,13 +56,7 @@ def load(filename: Path | str) -> tuple[dict[str, Any], list[tuple[int, str]]]: Load the given JSON/YAML file """ - file_path = filename if isinstance(filename, Path) else Path(filename) - - try: - content = file_path.read_text() - except UnicodeDecodeError: - logging.debug(f"Encoding for file {file_path} is not UTF-8, trying to detect it") - content = str(from_path(file_path).best()) + content = read_file_with_any_encoding(file_path=filename) if not all(key in content for key in ("$schema", "contentVersion")): return {}, [] diff --git a/checkov/bicep/parser.py b/checkov/bicep/parser.py index c075b914fd3..a0b8a4664bb 100644 --- a/checkov/bicep/parser.py +++ b/checkov/bicep/parser.py @@ -8,6 +8,8 @@ from pycep import BicepParser +from checkov.common.util.file_utils import read_file_with_any_encoding + if TYPE_CHECKING: from pycep.typing import BicepJson @@ -18,7 +20,7 @@ def __init__(self) -> None: def parse(self, file_path: Path) -> tuple[BicepJson, list[tuple[int, str]]] | tuple[None, None]: try: - content = file_path.read_text() + content = read_file_with_any_encoding(file_path=file_path) template = self.bicep_parser.parse(text=content) except Exception: logging.debug(f"[bicep] Couldn't parse {file_path}", exc_info=True) diff --git a/checkov/cloudformation/parser/cfn_yaml.py b/checkov/cloudformation/parser/cfn_yaml.py index 04a2181d84f..b9da4411495 100644 --- a/checkov/cloudformation/parser/cfn_yaml.py +++ b/checkov/cloudformation/parser/cfn_yaml.py @@ -26,6 +26,7 @@ from checkov.common.parsers.json.decoder import SimpleDecoder from checkov.common.parsers.node import StrNode, DictNode, ListNode from checkov.common.util.consts import MAX_IAC_FILE_SIZE +from checkov.common.util.file_utils import read_file_with_any_encoding try: from yaml.cyaml import CParser as Parser # type:ignore[attr-defined] @@ -253,11 +254,7 @@ def load(filename: str | Path, content_type: ContentType) -> tuple[dict[str, Any LOGGER.error(f"Encoding for file {file_path} could not be detected or read. Please try encoding the file as UTF-8.") raise e else: - try: - content = file_path.read_text() - except UnicodeDecodeError: - LOGGER.info(f"Encoding for file {file_path} is not UTF-8, trying to detect it") - content = str(from_path(file_path).best()) + content = read_file_with_any_encoding(file_path=file_path) if content_type == ContentType.CFN and "Resources" not in content: logging.debug(f'File {file_path} is expected to be a CFN template but has no Resources attribute') diff --git a/checkov/common/parsers/json/__init__.py b/checkov/common/parsers/json/__init__.py index 0f67001ba9c..c519587e529 100644 --- a/checkov/common/parsers/json/__init__.py +++ b/checkov/common/parsers/json/__init__.py @@ -9,10 +9,9 @@ from pathlib import Path from typing import Any -from charset_normalizer import from_path - from checkov.common.parsers.json.decoder import Decoder from checkov.common.parsers.json.errors import DecodeError +from checkov.common.util.file_utils import read_file_with_any_encoding LOGGER = logging.getLogger(__name__) @@ -24,13 +23,8 @@ def load( Load the given JSON file """ - try: - if not content: - file_path = filename if isinstance(filename, Path) else Path(filename) - content = file_path.read_text() - except UnicodeDecodeError: - LOGGER.info(f"Encoding for file {filename} is not UTF-8, trying to detect it") - content = str(from_path(filename).best()) # type:ignore[arg-type] # somehow str is not recognized as PathLike + if not content: + content = read_file_with_any_encoding(file_path=filename) file_lines = [(idx + 1, line) for idx, line in enumerate(content.splitlines(keepends=True))] diff --git a/checkov/common/parsers/yaml/loader.py b/checkov/common/parsers/yaml/loader.py index 468f4aa2b36..ef8e4fcb728 100644 --- a/checkov/common/parsers/yaml/loader.py +++ b/checkov/common/parsers/yaml/loader.py @@ -7,6 +7,8 @@ import yaml from yaml.loader import SafeLoader +from checkov.common.util.file_utils import read_file_with_any_encoding + if TYPE_CHECKING: from yaml import MappingNode @@ -31,8 +33,7 @@ def load(filename: str | Path, content: str | None = None) -> tuple[list[dict[st """ if not content: - file_path = filename if isinstance(filename, Path) else Path(filename) - content = file_path.read_text() + content = read_file_with_any_encoding(file_path=filename) file_lines = [(idx + 1, line) for idx, line in enumerate(content.splitlines(keepends=True))] diff --git a/checkov/common/util/file_utils.py b/checkov/common/util/file_utils.py index adc98027645..81d5ca17231 100644 --- a/checkov/common/util/file_utils.py +++ b/checkov/common/util/file_utils.py @@ -1,12 +1,18 @@ +from __future__ import annotations + import os.path import tarfile import base64 import gzip import io import logging - +from pathlib import Path from zipfile import ZipFile +from charset_normalizer import from_path + +logger = logging.getLogger(__name__) + def convert_to_unix_path(path: str) -> str: return path.replace('\\', '/') @@ -89,3 +95,17 @@ def get_file_size_safe(file_path: str) -> int: extra={"file_path": file_path} ) return -1 + + +def read_file_with_any_encoding(file_path: str | Path) -> str: + """Read the file with the system encoding and then try to detect it""" + + file_path = file_path if isinstance(file_path, Path) else Path(file_path) + + try: + content = file_path.read_text() + except UnicodeDecodeError: + logger.info(f"Encoding for file {file_path} is not UTF-8, trying to detect it") + content = str(from_path(file_path).best()) + + return content diff --git a/checkov/github_actions/utils.py b/checkov/github_actions/utils.py index b452385c3f5..84c871f9422 100644 --- a/checkov/github_actions/utils.py +++ b/checkov/github_actions/utils.py @@ -13,6 +13,7 @@ from checkov.common.parsers.yaml.loader import SafeLineLoaderGhaSchema from checkov.common.parsers.yaml.parser import parse +from checkov.common.util.file_utils import read_file_with_any_encoding from checkov.common.util.type_forcers import force_dict from checkov.github_actions.graph_builder.graph_components.resource_types import ResourceType from checkov.github_actions.schemas import gha_schema, gha_workflow @@ -41,7 +42,7 @@ def parse_file( if is_workflow_file(file_path): if not file_content: - file_content = file_path.read_text() + file_content = read_file_with_any_encoding(file_path=file_path) entity_schema = parse(filename=str(f), file_content=file_content) diff --git a/checkov/kubernetes/parser/k8_json.py b/checkov/kubernetes/parser/k8_json.py index 6ba87c07220..d20fa871e2e 100644 --- a/checkov/kubernetes/parser/k8_json.py +++ b/checkov/kubernetes/parser/k8_json.py @@ -6,9 +6,10 @@ from typing import Tuple, Dict, Any, List, TYPE_CHECKING import yaml -from charset_normalizer import from_path from yaml.loader import SafeLoader +from checkov.common.util.file_utils import read_file_with_any_encoding + if TYPE_CHECKING: from yaml import MappingNode @@ -38,13 +39,7 @@ def load(filename: Path) -> Tuple[List[Dict[str, Any]], List[Tuple[int, str]]]: Load the given JSON file """ - file_path = filename if isinstance(filename, Path) else Path(filename) - - try: - content = file_path.read_text() - except UnicodeDecodeError: - logger.debug(f"Encoding for file {file_path} is not UTF-8, trying to detect it") - content = str(from_path(file_path).best()) + content = read_file_with_any_encoding(file_path=filename) if not all(key in content for key in ("apiVersion", "kind")): return [{}], [] diff --git a/checkov/kubernetes/parser/k8_yaml.py b/checkov/kubernetes/parser/k8_yaml.py index 00a66fe12d4..c80e4e195d8 100644 --- a/checkov/kubernetes/parser/k8_yaml.py +++ b/checkov/kubernetes/parser/k8_yaml.py @@ -6,9 +6,10 @@ from typing import List, Dict, Any, Tuple, TYPE_CHECKING import yaml -from charset_normalizer import from_path from yaml.loader import SafeLoader +from checkov.common.util.file_utils import read_file_with_any_encoding + if TYPE_CHECKING: from yaml import MappingNode @@ -34,13 +35,7 @@ def load(filename: Path) -> Tuple[List[Dict[str, Any]], List[Tuple[int, str]]]: Load the given YAML file """ - file_path = filename if isinstance(filename, Path) else Path(filename) - - try: - content = file_path.read_text() - except UnicodeDecodeError: - logging.debug(f"Encoding for file {file_path} is not UTF-8, trying to detect it") - content = str(from_path(file_path).best()) + content = read_file_with_any_encoding(file_path=filename) if not all(key in content for key in ("apiVersion", "kind")): return [{}], [] diff --git a/checkov/openapi/runner.py b/checkov/openapi/runner.py index 3a50c09167d..880c49bee2b 100644 --- a/checkov/openapi/runner.py +++ b/checkov/openapi/runner.py @@ -6,6 +6,7 @@ from checkov.common.checks.base_check_registry import BaseCheckRegistry from checkov.common.bridgecrew.check_type import CheckType +from checkov.common.util.file_utils import read_file_with_any_encoding from checkov.yaml_doc.runner import Runner as YamlRunner from checkov.json_doc.runner import Runner as JsonRunner from pathlib import Path @@ -96,8 +97,7 @@ def get_resource(self, file_path: str, key: str, supported_entities: Iterable[st return ",".join(supported_entities) def load_file(self, filename: str | Path) -> str: - file_path = filename if isinstance(filename, Path) else Path(filename) - content = file_path.read_text() + content = read_file_with_any_encoding(file_path=filename) return content def pre_validate_file(self, file_content: str) -> bool: diff --git a/checkov/terraform_json/parser.py b/checkov/terraform_json/parser.py index 9ca05f2b11b..44dda3a6ac2 100644 --- a/checkov/terraform_json/parser.py +++ b/checkov/terraform_json/parser.py @@ -5,13 +5,13 @@ from pathlib import Path from typing import Any -from charset_normalizer import from_path from yaml.scanner import ScannerError from yaml import YAMLError from checkov.common.parsers.json import parse as json_parse from checkov.common.parsers.yaml import loader from checkov.common.util.consts import LINE_FIELD_NAMES +from checkov.common.util.file_utils import read_file_with_any_encoding from checkov.terraform.graph_builder.graph_components.block_types import BlockType COMMENT_FIELD_NAME = "//" @@ -61,11 +61,7 @@ def parse(file_path: Path) -> tuple[dict[str, Any], list[tuple[int, str]]] | tup def loads(file_path: Path) -> tuple[dict[str, Any], list[tuple[int, str]]]: """Loads the given JSON file with line numbers""" - try: - content = file_path.read_text() - except UnicodeDecodeError: - logging.debug(f"Encoding for file {file_path} is not UTF-8, trying to detect it") - content = str(from_path(file_path).best()) + content = read_file_with_any_encoding(file_path=file_path) if not all(key in content for key in ("resource", "provider")): return {}, [] diff --git a/tests/ansible/examples/k8s_utf16.yaml b/tests/ansible/examples/k8s_utf16.yaml new file mode 100644 index 00000000000..389581aacee Binary files /dev/null and b/tests/ansible/examples/k8s_utf16.yaml differ diff --git a/tests/ansible/test_runner.py b/tests/ansible/test_runner.py index 85868bc1304..5e0a387c0db 100644 --- a/tests/ansible/test_runner.py +++ b/tests/ansible/test_runner.py @@ -392,3 +392,19 @@ def test_get_resource_without_name(graph_connector): # then assert new_key == "tasks.amazon.aws.ec2_instance.unknown" + + +def test_runner_process_utf16_file(): + # given + test_file = EXAMPLES_DIR / "k8s_utf16.yaml" + + # when + report = Runner().run(root_folder="", files=[str(test_file)]) + + # then + summary = report.get_summary() + + assert summary["passed"] == 0 + assert summary["failed"] == 0 + assert summary["skipped"] == 0 + assert summary["parsing_errors"] == 0