diff --git a/.bandit b/.bandit deleted file mode 100644 index 71bc2cc869e..00000000000 --- a/.bandit +++ /dev/null @@ -1,2 +0,0 @@ -[bandit] -exclude: /test \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e804ab178bb..2b6c6047dfe 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: hooks: - id: debug-statements - repo: https://github.com/PyCQA/flake8 - rev: 6.0.0 + rev: 6.1.0 hooks: - id: flake8 language_version: python3.9 @@ -18,7 +18,7 @@ repos: - id: teyit language_version: python3.9 - repo: https://github.com/rhysd/actionlint - rev: v1.6.24 + rev: v1.6.25 hooks: - id: actionlint-docker # SC2129 - Consider using { cmd1; cmd2; } >> file instead of individual redirects. @@ -33,7 +33,7 @@ repos: additional_dependencies: - vistir<0.7.0 # can be removed, when v4.0.0 of pipenv-setup comes out - repo: https://github.com/seddonym/import-linter # checks the import dependencies between each other - rev: v1.10.0 + rev: v1.11.1 hooks: - id: import-linter language_version: python3.9 diff --git a/checkov/arm/runner.py b/checkov/arm/runner.py index 40497a668ea..3f471e33f05 100644 --- a/checkov/arm/runner.py +++ b/checkov/arm/runner.py @@ -5,6 +5,8 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any, cast +from typing_extensions import TypeAlias # noqa[TC002] + from checkov.arm.graph_builder.local_graph import ArmLocalGraph from checkov.arm.graph_manager import ArmGraphManager from checkov.arm.registry import arm_resource_registry, arm_parameter_registry @@ -28,8 +30,11 @@ from checkov.common.graph.checks_infra.registry import BaseRegistry from checkov.common.typing import LibraryGraphConnector, _CheckResult +_ArmContext: TypeAlias = "dict[str, dict[str, Any]]" +_ArmDefinitions: TypeAlias = "dict[str, dict[str, Any]]" + -class Runner(BaseRunner[ArmGraphManager]): +class Runner(BaseRunner[_ArmDefinitions, _ArmContext, ArmGraphManager]): check_type = CheckType.ARM # noqa: CCE003 # a static attribute def __init__( @@ -51,8 +56,9 @@ def __init__( self.graph_registry = get_graph_checks_registry(self.check_type) # need to check, how to support subclass differences - self.definitions: "dict[str, dict[str, Any]]" = {} # type:ignore[assignment] + self.definitions: _ArmDefinitions = {} self.definitions_raw: "dict[str, list[tuple[int, str]]]" = {} + self.context: _ArmContext | None = None self.root_folder: "str | None" = None def run( diff --git a/checkov/bicep/runner.py b/checkov/bicep/runner.py index eee02fafedd..668ff8d5752 100644 --- a/checkov/bicep/runner.py +++ b/checkov/bicep/runner.py @@ -5,6 +5,8 @@ from pathlib import Path from typing import cast, Type, TYPE_CHECKING, Any +from typing_extensions import TypeAlias # noqa[TC002] + from checkov.bicep.graph_builder.context_definitions import build_definitions_context from checkov.bicep.checks.param.registry import registry as param_registry from checkov.bicep.checks.resource.registry import registry as resource_registry @@ -40,8 +42,11 @@ from pycep.typing import BicepJson from typing_extensions import Literal +_BicepContext: TypeAlias = "dict[str, dict[str, Any]]" +_BicepDefinitions: TypeAlias = "dict[Path, BicepJson]" + -class Runner(ImageReferencerMixin[None], BaseRunner[BicepGraphManager]): +class Runner(ImageReferencerMixin[None], BaseRunner[_BicepDefinitions, _BicepContext, BicepGraphManager]): check_type = CheckType.BICEP # noqa: CCE003 # a static attribute block_type_registries: 'dict[Literal["parameters", "resources"], BaseCheckRegistry]' = { # noqa: CCE003 # a static attribute @@ -66,8 +71,8 @@ def __init__( ) self.graph_registry: Registry = get_graph_checks_registry(self.check_type) - self.context: dict[str, dict[str, Any]] = {} - self.definitions: dict[Path, BicepJson] = {} # type:ignore[assignment] # need to check, how to support subclass differences + self.context: _BicepContext = {} + self.definitions: _BicepDefinitions = {} self.definitions_raw: dict[Path, list[tuple[int, str]]] = {} # type:ignore[assignment] self.root_folder: str | Path | None = None diff --git a/checkov/circleci_pipelines/checks/SuspectCurlInScript.py b/checkov/circleci_pipelines/checks/SuspectCurlInScript.py index 0b1824ab111..0f125c0eba2 100644 --- a/checkov/circleci_pipelines/checks/SuspectCurlInScript.py +++ b/checkov/circleci_pipelines/checks/SuspectCurlInScript.py @@ -23,7 +23,7 @@ def scan_conf(self, conf: dict[str, Any]) -> tuple[CheckResult, dict[str, Any]]: if "run" not in conf: return CheckResult.PASSED, conf run = conf.get("run", "") - if type(run) == dict: + if isinstance(run, dict): run = run.get("command", "") if "curl" in run: badstuff = ['curl', 'POST'] diff --git a/checkov/cloudformation/checks/resource/base_resource_value_check.py b/checkov/cloudformation/checks/resource/base_resource_value_check.py index 55830481b93..831b1dfad98 100644 --- a/checkov/cloudformation/checks/resource/base_resource_value_check.py +++ b/checkov/cloudformation/checks/resource/base_resource_value_check.py @@ -63,7 +63,7 @@ def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: # those, allowing inspected_keys in checks to use the same syntax. # The last value shouldn't be changed, because it could be indeed a valid number for i in range(0, len(match) - 1): - if type(match[i]) == int: + if type(match[i]) is int: match[i] = f"[{match[i]}]" if match[:-1] == path_elements: diff --git a/checkov/cloudformation/runner.py b/checkov/cloudformation/runner.py index c5c06c93e4c..957c935105f 100644 --- a/checkov/cloudformation/runner.py +++ b/checkov/cloudformation/runner.py @@ -5,6 +5,8 @@ import os from typing import Type, Any, TYPE_CHECKING +from typing_extensions import TypeAlias # noqa[TC002] + from checkov.cloudformation import cfn_utils from checkov.cloudformation.cfn_utils import create_definitions, build_definitions_context from checkov.cloudformation.checks.resource.registry import cfn_registry @@ -35,8 +37,11 @@ from checkov.common.checks_infra.registry import Registry from checkov.common.images.image_referencer import Image +_CloudformationContext: TypeAlias = "dict[str, dict[str, Any]]" +_CloudformationDefinitions: TypeAlias = "dict[str, dict[str, Any]]" + -class Runner(ImageReferencerMixin[None], BaseRunner[CloudformationGraphManager]): +class Runner(ImageReferencerMixin[None], BaseRunner[_CloudformationDefinitions, _CloudformationContext, CloudformationGraphManager]): check_type = CheckType.CLOUDFORMATION # noqa: CCE003 # a static attribute def __init__( @@ -56,8 +61,8 @@ def __init__( if graph_manager is not None else CloudformationGraphManager(source=source, db_connector=db_connector) ) - self.context: "dict[str, dict[str, Any]]" = {} - self.definitions: "dict[str, dict[str, Any]]" = {} # type:ignore[assignment] # need to check, how to support subclass differences + self.context: _CloudformationContext = {} + self.definitions: _CloudformationDefinitions = {} self.definitions_raw: "dict[str, list[tuple[int, str]]]" = {} self.graph_registry: "Registry" = get_graph_checks_registry(self.check_type) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/equals_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/equals_attribute_solver.py index 17137ec0ae4..417b35731a6 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/equals_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/equals_attribute_solver.py @@ -9,7 +9,7 @@ class EqualsAttributeSolver(BaseAttributeSolver): def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool: attr_val = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None - if type(attr_val) == bool or type(self.value) == bool: + if isinstance(attr_val, bool) or isinstance(self.value, bool): # handle cases like str(False) == "false" # generally self.value will be a string, but could be a bool if the policy was created straight from json return str(attr_val).lower() == str(self.value).lower() diff --git a/checkov/common/output/record.py b/checkov/common/output/record.py index 351c6c479e1..74627d2019f 100644 --- a/checkov/common/output/record.py +++ b/checkov/common/output/record.py @@ -41,7 +41,7 @@ def __init__( file_abs_path: str, entity_tags: Optional[Dict[str, str]] = None, caller_file_path: Optional[str] = None, - caller_file_line_range: Optional[Tuple[int, int]] = None, + caller_file_line_range: tuple[int, int] | None = None, bc_check_id: Optional[str] = None, resource_address: Optional[str] = None, severity: Optional[Severity] = None, diff --git a/checkov/common/runners/base_runner.py b/checkov/common/runners/base_runner.py index bad89a9775b..65b1d082edf 100644 --- a/checkov/common/runners/base_runner.py +++ b/checkov/common/runners/base_runner.py @@ -24,6 +24,8 @@ from checkov.common.graph.checks_infra.registry import BaseRegistry from checkov.common.typing import _CheckResult, LibraryGraphConnector +_Context = TypeVar("_Context", bound="dict[Any, Any]|None") +_Definitions = TypeVar("_Definitions", bound="dict[Any, Any]|None") _GraphManager = TypeVar("_GraphManager", bound="GraphManager[Any, Any]|None") @@ -50,11 +52,11 @@ def strtobool(val: str) -> int: ignored_directories = IGNORED_DIRECTORIES_ENV.split(",") -class BaseRunner(ABC, Generic[_GraphManager]): +class BaseRunner(ABC, Generic[_Definitions, _Context, _GraphManager]): check_type = "" - definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None = None + definitions: _Definitions | None = None raw_definitions: dict[str, list[tuple[int, str]]] | None = None - context: dict[str, dict[str, Any]] | None = None + context: _Context | None = None breadcrumbs = None external_registries: list[BaseRegistry] | None = None graph_manager: _GraphManager | None = None @@ -106,8 +108,8 @@ def included_paths(self) -> Iterable[str]: def set_external_data( self, - definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None, - context: dict[str, dict[str, Any]] | None, + definitions: _Definitions | None, + context: _Context | None, breadcrumbs: dict[str, dict[str, Any]] | None, **kwargs: Any, ) -> None: diff --git a/checkov/common/runners/object_runner.py b/checkov/common/runners/object_runner.py index 01d281db1e6..6604942f297 100644 --- a/checkov/common/runners/object_runner.py +++ b/checkov/common/runners/object_runner.py @@ -8,7 +8,7 @@ from collections.abc import Iterable from pathlib import Path from typing import Any, TYPE_CHECKING, Callable -from typing_extensions import TypedDict +from typing_extensions import TypedDict, TypeAlias from checkov.common.checks_infra.registry import get_graph_checks_registry from checkov.common.models.enums import CheckResult @@ -30,6 +30,9 @@ from checkov.common.graph.checks_infra.base_check import BaseGraphCheck from checkov.common.runners.graph_builder.local_graph import ObjectLocalGraph +_ObjectContext: TypeAlias = "dict[str, dict[str, Any]]" +_ObjectDefinitions: TypeAlias = "dict[str, dict[str, Any] | list[dict[str, Any]]]" + class GhaMetadata(TypedDict): triggers: set[str] @@ -37,7 +40,7 @@ class GhaMetadata(TypedDict): jobs: dict[int, str] -class Runner(BaseRunner[ObjectGraphManager]): # if a graph is added, Any needs to replaced +class Runner(BaseRunner[_ObjectDefinitions, _ObjectContext, ObjectGraphManager]): def __init__( self, db_connector: LibraryGraphConnector | None = None, @@ -46,8 +49,9 @@ def __init__( graph_manager: ObjectGraphManager | None = None, ) -> None: super().__init__() - self.definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] = {} + self.definitions: _ObjectDefinitions = {} self.definitions_raw: dict[str, list[tuple[int, str]]] = {} + self.context: _ObjectContext | None = None self.map_file_path_to_gha_metadata_dict: dict[str, GhaMetadata] = {} self.root_folder: str | None = None diff --git a/checkov/common/typing.py b/checkov/common/typing.py index 7cb20ec95e9..7ff8db1b6be 100644 --- a/checkov/common/typing.py +++ b/checkov/common/typing.py @@ -13,7 +13,7 @@ from igraph import Graph from checkov.terraform.modules.module_objects import TFDefinitionKey -_BaseRunner = TypeVar("_BaseRunner", bound="BaseRunner[Any]") +_BaseRunner = TypeVar("_BaseRunner", bound="BaseRunner[Any, Any, Any]") _ScannerCallableAlias: TypeAlias = Callable[ [str, "BaseCheck", "list[_SkippedCheck]", "dict[str, Any]", str, str, "dict[str, Any]"], None diff --git a/checkov/dockerfile/runner.py b/checkov/dockerfile/runner.py index e012c3a2cf7..b52db37aeb4 100644 --- a/checkov/dockerfile/runner.py +++ b/checkov/dockerfile/runner.py @@ -5,6 +5,8 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any +from typing_extensions import TypeAlias # noqa[TC002] + from checkov.common.checks_infra.registry import get_graph_checks_registry from checkov.common.models.enums import CheckResult from checkov.common.typing import LibraryGraphConnector @@ -41,8 +43,11 @@ from checkov.common.graph.checks_infra.base_check import BaseGraphCheck from checkov.common.images.image_referencer import Image +_DockerfileContext: TypeAlias = "dict[str, dict[str, Any]]" +_DockerfileDefinitions: TypeAlias = "dict[str, dict[str, list[_Instruction]]]" + -class Runner(ImageReferencerMixin["dict[str, dict[str, list[_Instruction]]]"], BaseRunner[DockerfileGraphManager]): +class Runner(ImageReferencerMixin[_DockerfileDefinitions], BaseRunner[_DockerfileDefinitions, _DockerfileContext, DockerfileGraphManager]): check_type = CheckType.DOCKERFILE # noqa: CCE003 # a static attribute def __init__( @@ -61,8 +66,8 @@ def __init__( ) self.graph_registry = get_graph_checks_registry(self.check_type) - self.context: dict[str, dict[str, Any]] = {} - self.definitions: "dict[str, dict[str, list[_Instruction]]]" = {} # type:ignore[assignment] # need to check, how to support subclass differences + self.context: _DockerfileContext = {} + self.definitions: _DockerfileDefinitions = {} self.definitions_raw: "dict[str, list[str]]" = {} # type:ignore[assignment] self.root_folder: str | None = None diff --git a/checkov/helm/runner.py b/checkov/helm/runner.py index c1baf1cb980..4226cf05f05 100644 --- a/checkov/helm/runner.py +++ b/checkov/helm/runner.py @@ -21,7 +21,7 @@ from checkov.helm.image_referencer.manager import HelmImageReferencerManager from checkov.helm.registry import registry from checkov.kubernetes.graph_builder.local_graph import KubernetesLocalGraph -from checkov.kubernetes.runner import Runner as k8_runner, handle_timeout +from checkov.kubernetes.runner import Runner as k8_runner, handle_timeout, _KubernetesContext, _KubernetesDefinitions from checkov.runner_filter import RunnerFilter import signal @@ -122,7 +122,7 @@ def extract_images( return images -class Runner(BaseRunner["KubernetesGraphManager"]): +class Runner(BaseRunner[_KubernetesDefinitions, _KubernetesContext, "KubernetesGraphManager"]): check_type: str = CheckType.HELM # noqa: CCE003 # a static attribute helm_command = 'helm' # noqa: CCE003 # a static attribute system_deps = True # noqa: CCE003 # a static attribute diff --git a/checkov/kubernetes/runner.py b/checkov/kubernetes/runner.py index 2c6d5e03da3..7644761031d 100644 --- a/checkov/kubernetes/runner.py +++ b/checkov/kubernetes/runner.py @@ -4,6 +4,8 @@ import os from typing import Type, Any, TYPE_CHECKING +from typing_extensions import TypeAlias # noqa[TC002] + from checkov.common.checks_infra.registry import get_graph_checks_registry from checkov.common.graph.checks_infra.registry import BaseRegistry from checkov.common.typing import LibraryGraphConnector @@ -39,6 +41,9 @@ from checkov.common.images.image_referencer import Image from checkov.common.typing import _CheckResult, _EntityContext +_KubernetesContext: TypeAlias = "dict[str, dict[str, Any]]" +_KubernetesDefinitions: TypeAlias = "dict[str, list[dict[str, Any]]]" + class TimeoutError(Exception): pass @@ -48,7 +53,7 @@ def handle_timeout(signum: int, frame: FrameType | None) -> Any: raise TimeoutError('command got timeout') -class Runner(ImageReferencerMixin[None], BaseRunner[KubernetesGraphManager]): +class Runner(ImageReferencerMixin[None], BaseRunner[_KubernetesDefinitions, _KubernetesContext, KubernetesGraphManager]): check_type = CheckType.KUBERNETES # noqa: CCE003 # a static attribute def __init__( @@ -69,8 +74,9 @@ def __init__( graph_manager if graph_manager else KubernetesGraphManager(source=source, db_connector=db_connector) self.graph_registry = get_graph_checks_registry(self.check_type) - self.definitions: "dict[str, list[dict[str, Any]]]" = {} # type:ignore[assignment] + self.definitions: _KubernetesDefinitions = {} self.definitions_raw: "dict[str, list[tuple[int, str]]]" = {} + self.context: _KubernetesContext | None = None self.report_mutator_data: "dict[str, dict[str, Any]]" = {} self.report_type = report_type diff --git a/checkov/kustomize/runner.py b/checkov/kustomize/runner.py index f705bb95774..041301ce0c7 100644 --- a/checkov/kustomize/runner.py +++ b/checkov/kustomize/runner.py @@ -27,8 +27,7 @@ from checkov.common.util.type_forcers import convert_str_to_bool from checkov.kubernetes.kubernetes_utils import create_check_result, get_resource_id, calculate_code_lines, \ PARENT_RESOURCE_ID_KEY_NAME -from checkov.kubernetes.runner import Runner as K8sRunner -from checkov.kubernetes.runner import _get_entity_abs_path +from checkov.kubernetes.runner import Runner as K8sRunner, _get_entity_abs_path, _KubernetesContext, _KubernetesDefinitions from checkov.kustomize.image_referencer.manager import KustomizeImageReferencerManager from checkov.kustomize.utils import get_kustomize_version, get_kubectl_version from checkov.runner_filter import RunnerFilter @@ -65,7 +64,7 @@ def __init__( def set_external_data( self, - definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None, + definitions: _KubernetesDefinitions | None, context: dict[str, dict[str, Any]] | None, breadcrumbs: dict[str, dict[str, Any]] | None, report_mutator_data: dict[str, dict[str, Any]] | None = None, @@ -354,7 +353,7 @@ def extract_images( return images -class Runner(BaseRunner["KubernetesGraphManager"]): +class Runner(BaseRunner[_KubernetesDefinitions, _KubernetesContext, "KubernetesGraphManager"]): kustomize_command = 'kustomize' # noqa: CCE003 # a static attribute kubectl_command = 'kubectl' # noqa: CCE003 # a static attribute check_type = CheckType.KUSTOMIZE # noqa: CCE003 # a static attribute diff --git a/checkov/sca_package/runner.py b/checkov/sca_package/runner.py index ff9a693047a..552d0560cf2 100644 --- a/checkov/sca_package/runner.py +++ b/checkov/sca_package/runner.py @@ -17,7 +17,7 @@ from checkov.sca_package.scanner import Scanner -class Runner(BaseRunner[None]): +class Runner(BaseRunner[None, None, None]): check_type = CheckType.SCA_PACKAGE # noqa: CCE003 # a static attribute def __init__(self, report_type: str = check_type) -> None: diff --git a/checkov/sca_package_2/runner.py b/checkov/sca_package_2/runner.py index 8ee91777bbb..d68525f2800 100644 --- a/checkov/sca_package_2/runner.py +++ b/checkov/sca_package_2/runner.py @@ -19,7 +19,7 @@ from checkov.sca_package_2.scanner import Scanner -class Runner(BaseRunner[None]): +class Runner(BaseRunner[None, None, None]): check_type = CheckType.SCA_PACKAGE # noqa: CCE003 # a static attribute def __init__(self, report_type: str = check_type) -> None: diff --git a/checkov/secrets/runner.py b/checkov/secrets/runner.py index 74eba7f995d..860bd970696 100644 --- a/checkov/secrets/runner.py +++ b/checkov/secrets/runner.py @@ -81,7 +81,7 @@ MAX_FILE_SIZE = int(os.getenv('CHECKOV_MAX_FILE_SIZE', '5000000')) # 5 MB is default limit -class Runner(BaseRunner[None]): +class Runner(BaseRunner[None, None, None]): check_type = CheckType.SECRETS # noqa: CCE003 # a static attribute def __init__(self, file_extensions: Iterable[str] | None = None, file_names: Iterable[str] | None = None): diff --git a/checkov/terraform/checks/resource/aws/IAMRoleAllowsPublicAssume.py b/checkov/terraform/checks/resource/aws/IAMRoleAllowsPublicAssume.py index b8df5fa592a..b0460cb1179 100644 --- a/checkov/terraform/checks/resource/aws/IAMRoleAllowsPublicAssume.py +++ b/checkov/terraform/checks/resource/aws/IAMRoleAllowsPublicAssume.py @@ -25,7 +25,7 @@ def scan_resource_conf(self, conf): if 'AWS' in statement['Principal']: # Can be a string or an array of strings aws = statement['Principal']['AWS'] - if (type(aws) == str and aws == '*') or (type(aws) == list and '*' in aws): + if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): return CheckResult.FAILED except Exception: # nosec pass diff --git a/checkov/terraform/checks/resource/aws/KMSKeyWildcardPrincipal.py b/checkov/terraform/checks/resource/aws/KMSKeyWildcardPrincipal.py index 6e516cd266d..878e6c0a97a 100644 --- a/checkov/terraform/checks/resource/aws/KMSKeyWildcardPrincipal.py +++ b/checkov/terraform/checks/resource/aws/KMSKeyWildcardPrincipal.py @@ -26,11 +26,11 @@ def scan_resource_conf(self, conf): continue if 'AWS' in principal: aws = principal['AWS'] - if (type(aws) == str and aws == '*') or (type(aws) == list and '*' in aws): + if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): idx_evaluated_key = f'[{idx}]/' if isinstance(policy_block['Statement'], list) else '' self.evaluated_keys = [f'policy/[0]/Statement/{idx_evaluated_key}Principal/AWS'] return CheckResult.FAILED - if (type(principal) == str and principal == '*') or (type(principal) == list and '*' in principal): + if (isinstance(principal, str) and principal == '*') or (isinstance(principal, list) and '*' in principal): idx_evaluated_key = f'[{idx}]/' if isinstance(policy_block['Statement'], list) else '' self.evaluated_keys = [f'policy/[0]/Statement/{idx_evaluated_key}Principal'] return CheckResult.FAILED diff --git a/checkov/terraform/checks/resource/aws/S3ProtectAgainstPolicyLockout.py b/checkov/terraform/checks/resource/aws/S3ProtectAgainstPolicyLockout.py index 83ba861915a..cbcb9627515 100644 --- a/checkov/terraform/checks/resource/aws/S3ProtectAgainstPolicyLockout.py +++ b/checkov/terraform/checks/resource/aws/S3ProtectAgainstPolicyLockout.py @@ -1,20 +1,22 @@ +from __future__ import annotations + +import json +from typing import Any + from checkov.common.models.enums import CheckResult, CheckCategories from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck from checkov.common.util.type_forcers import force_list -import json -from typing import List class S3ProtectAgainstPolicyLockout(BaseResourceCheck): - - def __init__(self): + def __init__(self) -> None: name = "Ensure S3 bucket policy does not lockout all but root user. (Prevent lockouts needing root account fixes)" id = "CKV_AWS_93" - supported_resources = ['aws_s3_bucket', 'aws_s3_bucket_policy'] - categories = [CheckCategories.IAM] + supported_resources = ('aws_s3_bucket', 'aws_s3_bucket_policy') + categories = (CheckCategories.IAM,) super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) - def scan_resource_conf(self, conf): + def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: if 'policy' not in conf.keys() or not isinstance(conf['policy'][0], str): return CheckResult.PASSED try: @@ -32,7 +34,7 @@ def scan_resource_conf(self, conf): if 'AWS' in statement['Principal']: # Can be a string or an array of strings aws = statement['Principal']['AWS'] - if (type(aws) == str and aws == '*') or (type(aws) == list and '*' in aws): + if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws): return CheckResult.FAILED action = statement['Action'] @@ -41,13 +43,13 @@ def scan_resource_conf(self, conf): if 's3' in statement['Action']: # Can be a string or an array of strings s3 = statement['Action']['s3'] - if (type(s3) == str and s3 == '*') or (type(s3) == list and '*' in s3): + if (isinstance(s3, str) and s3 == '*') or (isinstance(s3, list) and '*' in s3): return CheckResult.FAILED except Exception: # nosec pass return CheckResult.PASSED - def get_evaluated_keys(self) -> List[str]: + def get_evaluated_keys(self) -> list[str]: return ['policy'] diff --git a/checkov/terraform/checks/resource/gcp/GoogleCloudSqlDatabasePubliclyAccessible.py b/checkov/terraform/checks/resource/gcp/GoogleCloudSqlDatabasePubliclyAccessible.py index 218f178c55e..ae2dee1810a 100644 --- a/checkov/terraform/checks/resource/gcp/GoogleCloudSqlDatabasePubliclyAccessible.py +++ b/checkov/terraform/checks/resource/gcp/GoogleCloudSqlDatabasePubliclyAccessible.py @@ -22,12 +22,12 @@ def scan_resource_conf(self, conf): self.evaluated_keys = ['settings/[0]/ip_configuration'] if 'authorized_networks' in ip_config: auth_networks = ip_config['authorized_networks'] - if type(auth_networks) != list: # handle possible legacy case + if not isinstance(auth_networks, list): # handle possible legacy case auth_networks = [auth_networks] for network in auth_networks: if 'value' in network: val = network['value'] - if type(val) == list: # handle possible parsing discrepancies + if isinstance(val, list): # handle possible parsing discrepancies val = val[0] if val.endswith('/0'): self.evaluated_keys = ['settings/[0]/ip_configuration/authorized_networks/[0]/value', diff --git a/checkov/terraform/checks/resource/kubernetes/AllowedCapabilities.py b/checkov/terraform/checks/resource/kubernetes/AllowedCapabilities.py index 9e8f55492c3..47dacf647db 100644 --- a/checkov/terraform/checks/resource/kubernetes/AllowedCapabilities.py +++ b/checkov/terraform/checks/resource/kubernetes/AllowedCapabilities.py @@ -35,7 +35,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("security_context"): context = container.get("security_context")[0] diff --git a/checkov/terraform/checks/resource/kubernetes/AllowedCapabilitiesSysAdmin.py b/checkov/terraform/checks/resource/kubernetes/AllowedCapabilitiesSysAdmin.py index ae921b85f5f..a663ea3310e 100644 --- a/checkov/terraform/checks/resource/kubernetes/AllowedCapabilitiesSysAdmin.py +++ b/checkov/terraform/checks/resource/kubernetes/AllowedCapabilitiesSysAdmin.py @@ -33,7 +33,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("security_context") and isinstance(container.get("security_context"), list): context = container.get("security_context")[0] diff --git a/checkov/terraform/checks/resource/kubernetes/CPULimits.py b/checkov/terraform/checks/resource/kubernetes/CPULimits.py index c77fdc8994b..034c57765fb 100644 --- a/checkov/terraform/checks/resource/kubernetes/CPULimits.py +++ b/checkov/terraform/checks/resource/kubernetes/CPULimits.py @@ -36,7 +36,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: if not containers: return CheckResult.UNKNOWN for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("resources"): resources = container.get("resources")[0] diff --git a/checkov/terraform/checks/resource/kubernetes/CPURequests.py b/checkov/terraform/checks/resource/kubernetes/CPURequests.py index 2a92eef32f9..fe7a5f4514e 100644 --- a/checkov/terraform/checks/resource/kubernetes/CPURequests.py +++ b/checkov/terraform/checks/resource/kubernetes/CPURequests.py @@ -32,7 +32,7 @@ def scan_resource_conf(self, conf) -> CheckResult: if containers is None: return CheckResult.UNKNOWN for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("resources"): resources = container.get("resources")[0] diff --git a/checkov/terraform/checks/resource/kubernetes/ContainerSecurityContext.py b/checkov/terraform/checks/resource/kubernetes/ContainerSecurityContext.py index 1cc77ab8ac7..8c09d1a4831 100644 --- a/checkov/terraform/checks/resource/kubernetes/ContainerSecurityContext.py +++ b/checkov/terraform/checks/resource/kubernetes/ContainerSecurityContext.py @@ -34,7 +34,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if not container.get("security_context"): self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]/security_context'] diff --git a/checkov/terraform/checks/resource/kubernetes/DropCapabilities.py b/checkov/terraform/checks/resource/kubernetes/DropCapabilities.py index 4dce0f7af93..bea4f9b8590 100644 --- a/checkov/terraform/checks/resource/kubernetes/DropCapabilities.py +++ b/checkov/terraform/checks/resource/kubernetes/DropCapabilities.py @@ -38,7 +38,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN dropped = False if container.get("security_context") and isinstance(container.get("security_context"), list): diff --git a/checkov/terraform/checks/resource/kubernetes/MemoryLimits.py b/checkov/terraform/checks/resource/kubernetes/MemoryLimits.py index 8b90b30a2dd..0a970762dc9 100644 --- a/checkov/terraform/checks/resource/kubernetes/MemoryLimits.py +++ b/checkov/terraform/checks/resource/kubernetes/MemoryLimits.py @@ -33,7 +33,7 @@ def scan_resource_conf(self, conf) -> CheckResult: if containers is None: return CheckResult.UNKNOWN for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("resources"): resources = container.get("resources")[0] diff --git a/checkov/terraform/checks/resource/kubernetes/MemoryRequests.py b/checkov/terraform/checks/resource/kubernetes/MemoryRequests.py index cf0c8aecc83..1359e0a2629 100644 --- a/checkov/terraform/checks/resource/kubernetes/MemoryRequests.py +++ b/checkov/terraform/checks/resource/kubernetes/MemoryRequests.py @@ -32,7 +32,7 @@ def scan_resource_conf(self, conf) -> CheckResult: if containers is None: return CheckResult.UNKNOWN for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("resources"): resources = container.get("resources")[0] diff --git a/checkov/terraform/checks/resource/kubernetes/PodSecurityContext.py b/checkov/terraform/checks/resource/kubernetes/PodSecurityContext.py index b6356f35278..e67bc736f3c 100644 --- a/checkov/terraform/checks/resource/kubernetes/PodSecurityContext.py +++ b/checkov/terraform/checks/resource/kubernetes/PodSecurityContext.py @@ -28,7 +28,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if not container.get("security_context"): @@ -44,7 +44,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = temp_spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if not container.get("security_context"): diff --git a/checkov/terraform/checks/resource/kubernetes/PrivilegedContainer.py b/checkov/terraform/checks/resource/kubernetes/PrivilegedContainer.py index cb3941c9930..47e438c5760 100644 --- a/checkov/terraform/checks/resource/kubernetes/PrivilegedContainer.py +++ b/checkov/terraform/checks/resource/kubernetes/PrivilegedContainer.py @@ -34,7 +34,7 @@ def scan_resource_conf(self, conf) -> CheckResult: containers = spec.get("container") for idx, container in enumerate(containers): - if type(container) != dict: + if not isinstance(container, dict): return CheckResult.UNKNOWN if container.get("security_context"): context = container.get("security_context")[0] diff --git a/checkov/terraform/graph_manager.py b/checkov/terraform/graph_manager.py index f4fd9c9f65a..1dddc9c8211 100644 --- a/checkov/terraform/graph_manager.py +++ b/checkov/terraform/graph_manager.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import Type, Any, TYPE_CHECKING +from typing import Type, Any, TYPE_CHECKING, overload from checkov.common.util.consts import DEFAULT_EXTERNAL_MODULES_DIR from checkov.terraform.graph_builder.local_graph import TerraformLocalGraph @@ -87,8 +87,23 @@ def build_graph_from_source_directory( return local_graph, tf_definitions - def build_graph_from_definitions(self, definitions: dict[TFDefinitionKey, dict[str, Any]], - render_variables: bool = True) -> TerraformLocalGraph: + @overload + def build_graph_from_definitions( + self, definitions: dict[str, dict[str, Any]], render_variables: bool = True, + ) -> TerraformLocalGraph: + ... + + @overload + def build_graph_from_definitions( + self, definitions: dict[TFDefinitionKey, dict[str, Any]], render_variables: bool = True, + ) -> TerraformLocalGraph: + ... + + def build_graph_from_definitions( + self, + definitions: dict[str, dict[str, Any]] | dict[TFDefinitionKey, dict[str, Any]], + render_variables: bool = True, + ) -> TerraformLocalGraph: module, _ = self.parser.parse_hcl_module_from_tf_definitions(definitions, "", self.source) local_graph = TerraformLocalGraph(module) local_graph.build_graph(render_variables=render_variables) diff --git a/checkov/terraform/runner.py b/checkov/terraform/runner.py index de17f0f0bea..66e0ec2a3ee 100644 --- a/checkov/terraform/runner.py +++ b/checkov/terraform/runner.py @@ -8,6 +8,7 @@ import dpath import igraph +from typing_extensions import TypeAlias # noqa[TC002] from checkov.common.checks_infra.registry import get_graph_checks_registry from checkov.common.graph.checks_infra.registry import BaseRegistry @@ -47,8 +48,12 @@ if TYPE_CHECKING: from networkx import DiGraph + from checkov.common.checks_infra.registry import Registry from checkov.common.images.image_referencer import Image - from checkov.common.typing import LibraryGraphConnector, _SkippedCheck + from checkov.common.typing import LibraryGraphConnector, _SkippedCheck, LibraryGraph + +_TerraformContext: TypeAlias = "dict[TFDefinitionKey, dict[str, Any]]" +_TerraformDefinitions: TypeAlias = "dict[TFDefinitionKey, dict[str, Any]]" # Allow the evaluation of empty variables dpath.options.ALLOW_EMPTY_STRING_KEYS = True @@ -56,7 +61,7 @@ CHECK_BLOCK_TYPES = frozenset(['resource', 'data', 'provider', 'module']) -class Runner(ImageReferencerMixin[None], BaseRunner[TerraformGraphManager]): +class Runner(ImageReferencerMixin[None], BaseRunner[_TerraformDefinitions, _TerraformContext, TerraformGraphManager]): check_type = CheckType.TERRAFORM # noqa: CCE003 # a static attribute def __init__( @@ -72,15 +77,15 @@ def __init__( self.external_registries = [] if external_registries is None else external_registries self.graph_class = graph_class self.parser = parser or TFParser() - self.definitions: dict[TFDefinitionKey, dict[str, Any]] | None = None - self.context: dict[TFDefinitionKey, dict[str, Any]] | None = None + self.definitions: _TerraformDefinitions | None = None + self.context: _TerraformContext | None = None self.breadcrumbs = None - self.evaluations_context: Dict[str, Dict[str, EvaluationContext]] = {} + self.evaluations_context: Dict[TFDefinitionKey, Dict[str, EvaluationContext]] = {} self.graph_manager: TerraformGraphManager = graph_manager if graph_manager is not None else TerraformGraphManager( source=source, db_connector=db_connector or self.db_connector, ) - self.graph_registry = get_graph_checks_registry(self.check_type) + self.graph_registry: Registry = get_graph_checks_registry(self.check_type) self.definitions_with_modules: dict[str, dict[str, Any]] = {} self.referrer_cache: Dict[str, str] = {} self.non_referred_cache: Set[str] = set() @@ -94,7 +99,7 @@ def __init__( def run( self, - root_folder: str, + root_folder: str | None, external_checks_dir: list[str] | None = None, files: list[str] | None = None, runner_filter: RunnerFilter | None = None, @@ -108,7 +113,7 @@ def run( parsing_errors: dict[str, Exception] = {} self.load_external_checks(external_checks_dir) local_graph = None - all_graphs = [] + all_graphs: list[LibraryGraph] = [] if self.context is None or self.definitions is None or self.breadcrumbs is None: self.definitions = {} logging.info("Scanning root folder and producing fresh tf_definitions and context") @@ -147,12 +152,11 @@ def run( elif files: files = [os.path.abspath(file) for file in files] root_folder = os.path.split(os.path.commonprefix(files))[0] - self.parser.evaluate_variables = False self._parse_files(files, parsing_errors) if CHECKOV_CREATE_GRAPH: if tf_split_graph: - local_graph = self.graph_manager.build_multi_graph_from_definitions(self.definitions) + local_graph = self.graph_manager.build_multi_graph_from_definitions(self.definitions) # type:ignore[assignment] # will be fixed after removing 'CHECKOV_CREATE_GRAPH' else: # local_graph needs to be a list to allow supporting multi graph local_graph = [self.graph_manager.build_graph_from_definitions(self.definitions)] @@ -160,9 +164,12 @@ def run( raise Exception("Root directory was not specified, files were not specified") if CHECKOV_CREATE_GRAPH and local_graph: - self._update_definitions_and_breadcrumbs(all_graphs, local_graph, report, root_folder) + self._update_definitions_and_breadcrumbs(all_graphs, local_graph, report, root_folder) # type:ignore[arg-type] # will be fixed after removing 'CHECKOV_CREATE_GRAPH' else: logging.info("Scanning root folder using existing tf_definitions") + if root_folder is None: + # this shouldn't happen + raise Exception("Root directory was not specified") self.pbar.initiate(len(self.definitions)) self.check_tf_definition(report, root_folder, runner_filter, collect_skip_comments) @@ -193,7 +200,9 @@ def run( return report - def _update_definitions_and_breadcrumbs(self, all_graphs, local_graph, report, root_folder): + def _update_definitions_and_breadcrumbs( + self, all_graphs: list[LibraryGraph], local_graph: list[TerraformLocalGraph], report: Report, root_folder: str + ) -> None: self.definitions = {} self.breadcrumbs = {} for graph in local_graph: @@ -252,7 +261,7 @@ def get_graph_checks_report(self, root_folder: str, runner_filter: RunnerFilter, copy_of_check_result['result'] = CheckResult.SKIPPED copy_of_check_result['suppress_comment'] = skipped_check['suppress_comment'] break - copy_of_check_result['entity'] = entity.get(CustomAttributes.CONFIG) + copy_of_check_result['entity'] = entity[CustomAttributes.CONFIG] connected_node_data = self.get_connected_node(entity, root_folder) if platform.system() == "Windows": root_folder = os.path.split(full_file_path)[0] @@ -266,7 +275,7 @@ def get_graph_checks_report(self, root_folder: str, runner_filter: RunnerFilter, censored_code_lines = omit_secret_value_from_graph_checks( check=check, check_result=check_result, - entity_code_lines=entity_context.get('code_lines'), + entity_code_lines=entity_context.get('code_lines', []), entity_config=entity_config, resource_attributes_to_omit=runner_filter.resource_attr_to_omit ) @@ -277,8 +286,10 @@ def get_graph_checks_report(self, root_folder: str, runner_filter: RunnerFilter, check_result=copy_of_check_result, code_block=censored_code_lines, file_path=f"{os.sep}{os.path.relpath(full_file_path, root_folder)}", - file_line_range=[entity_context.get('start_line'), - entity_context.get('end_line')], + file_line_range=[ + entity_context.get('start_line', 1), + entity_context.get('end_line', 1), + ], resource=resource, entity_tags=entity.get('tags', {}), evaluations=None, @@ -304,13 +315,13 @@ def get_entity_context_and_evaluations(self, entity: dict[str, Any]) -> dict[str tf_source_module_obj = entity.get(CustomAttributes.SOURCE_MODULE_OBJECT) if isinstance(tf_source_module_obj, dict): tf_source_module_obj = TFModule.from_json(tf_source_module_obj) - full_file_path = TFDefinitionKey(file_path=entity.get(CustomAttributes.FILE_PATH), + full_file_path = TFDefinitionKey(file_path=entity[CustomAttributes.FILE_PATH], tf_source_modules=tf_source_module_obj) definition_path = entity[CustomAttributes.BLOCK_NAME].split('.') entity_context_path = [block_type] + definition_path try: - entity_context = self.context[full_file_path] + entity_context = self.context[full_file_path] # type:ignore[index] # at this point self.context is set for k in entity_context_path: if k in entity_context: entity_context = entity_context[k] @@ -331,10 +342,18 @@ def check_tf_definition( collect_skip_comments: bool = True, ) -> None: parser_registry.reset_definitions_context() + if not self.definitions: + # nothing to do + self.pbar.update() + self.pbar.close() + return + if not self.context: definitions_context = {} - for definition in self.definitions.items(): - definitions_context = parser_registry.enrich_definitions_context(definition, collect_skip_comments) + for definition_key_tuple in self.definitions.items(): + definitions_context = parser_registry.enrich_definitions_context( + definitions=definition_key_tuple, collect_skip_comments=collect_skip_comments + ) self.context = definitions_context logging.debug('Created definitions context') @@ -354,7 +373,7 @@ def check_tf_definition( def run_all_blocks( self, definition: dict[str, list[dict[str, Any]]], - definitions_context: dict[str, dict[str, Any]], + definitions_context: dict[TFDefinitionKey, dict[str, Any]], full_file_path: TFDefinitionKey, root_folder: str, report: Report, @@ -374,13 +393,13 @@ def run_all_blocks( def run_block( self, entities: list[dict[str, Any]], - definition_context: dict[str, dict[str, Any]], + definition_context: dict[TFDefinitionKey, dict[str, Any]], full_file_path: TFDefinitionKey, root_folder: str, report: Report, scanned_file: str, block_type: str, - runner_filter: RunnerFilter | None = None, + runner_filter: RunnerFilter, entity_context_path_header: str | None = None, module_referrer: str | None = None, ) -> None: @@ -413,14 +432,15 @@ def run_block( caller_context = definition_context[module_full_path].get(BlockType.MODULE, {}).get(module_name) if not caller_context: continue - caller_file_line_range = [caller_context.get('start_line'), caller_context.get('end_line')] + caller_file_line_range = (caller_context.get('start_line', 1), caller_context.get('end_line', 1)) abs_caller_file = get_abs_path(module_full_path) caller_file_path = f"{os.sep}{os.path.relpath(abs_caller_file, root_folder)}" if entity_context_path_header is None: entity_context_path = [block_type] + definition_path else: - entity_context_path = entity_context_path_header + block_type + definition_path + # TODO: check, if this code part is still used + entity_context_path = [entity_context_path_header, block_type] + definition_path # Entity can exist only once per dir, for file as well context_path = full_file_path try: @@ -428,13 +448,13 @@ def run_block( definition_context[context_path], entity_context_path, ) - entity_lines_range = [entity_context.get('start_line'), entity_context.get('end_line')] - entity_code_lines = entity_context.get('code_lines') + entity_lines_range = [entity_context.get('start_line', 1), entity_context.get('end_line', 1)] + entity_code_lines = entity_context.get('code_lines', []) skipped_checks = entity_context.get('skipped_checks') except KeyError: # TODO: Context info isn't working for modules - entity_lines_range = None - entity_code_lines = None + entity_lines_range = [1, 1] + entity_code_lines = [] skipped_checks = None if full_file_path in self.evaluations_context: @@ -478,7 +498,7 @@ def run_block( details=check.details, definition_context_file_path=full_file_path.file_path ) - if CHECKOV_CREATE_GRAPH: + if CHECKOV_CREATE_GRAPH and self.breadcrumbs: entity_key = entity_id breadcrumb = self.breadcrumbs.get(record.file_path, {}).get(entity_key) if breadcrumb: @@ -498,7 +518,11 @@ def run_block( ) def _parse_files(self, files: list[str], parsing_errors: dict[str, Exception]) -> None: - def parse_file(file: str) -> tuple[str, dict[str, Any], dict[str, Exception]] | None: + if self.definitions is None: + # just make sure it is not 'None' + self.definitions = {} + + def parse_file(file: str) -> tuple[str, dict[str, Any] | None, dict[str, Exception]] | None: if not (file.endswith(".tf") or file.endswith(".hcl")): return None file_parsing_errors: dict[str, Exception] = {} @@ -518,6 +542,10 @@ def parse_file(file: str) -> tuple[str, dict[str, Any], dict[str, Exception]] | parsing_errors.update(file_parsing_errors) def push_skipped_checks_down_from_modules(self, definition_context: dict[TFDefinitionKey, dict[str, Any]]) -> None: + if not self.definitions: + # no need to proceed + return + module_context_parser = parser_registry.context_parsers[BlockType.MODULE] for tf_definition_key, definition in self.definitions.items(): full_file_path = tf_definition_key @@ -544,11 +572,15 @@ def push_skipped_checks_down( continue if block_type == "module": + if not self.definitions: + # no need to proceed + continue + # modules don't have a type, just a name for module_name, module_config in block_configs.items(): # append the skipped checks also from a module to another module module_config["skipped_checks"] += skipped_checks - module_context = next(m for m in self.definitions.get(resolved_paths[ind]).get(block_type) if module_name in m) + module_context = next(m for m in self.definitions.get(resolved_paths[ind], {}).get(block_type, []) if module_name in m) recursive_resolved_paths = module_context.get(module_name).get(RESOLVED_MODULE_ENTRY_NAME) self.push_skipped_checks_down(definition_context, skipped_checks, recursive_resolved_paths) else: @@ -559,41 +591,6 @@ def push_skipped_checks_down( # append the skipped checks from the module to the other resources. resource_config["skipped_checks"] += skipped_checks - def _find_id_for_referrer(self, full_file_path: str) -> Optional[str]: - cached_referrer = self.referrer_cache.get(full_file_path) - if cached_referrer: - return cached_referrer - if full_file_path in self.non_referred_cache: - return None - - if not self.definitions_with_modules: - self._prepare_definitions_with_modules() - for file_content in self.definitions_with_modules.values(): - for modules in file_content["module"]: - for module_name, module_content in modules.items(): - if RESOLVED_MODULE_ENTRY_NAME not in module_content: - continue - - if full_file_path in module_content[RESOLVED_MODULE_ENTRY_NAME]: - id_referrer = f"module.{module_name}" - self.referrer_cache[full_file_path] = id_referrer - return id_referrer - - self.non_referred_cache.add(full_file_path) - return None - - def _prepare_definitions_with_modules(self) -> None: - def __cache_file_content(file_name: str, file_modules: list[dict[str, Any]]) -> None: - for modules in file_modules: - for module_content in modules.values(): - if RESOLVED_MODULE_ENTRY_NAME in module_content: - self.definitions_with_modules[file_name] = file_content - return - - for file, file_content in self.definitions.items(): - if "module" in file_content: - __cache_file_content(file_name=file, file_modules=file_content["module"]) - def extract_images( self, graph_connector: DiGraph | None = None, diff --git a/checkov/terraform/tf_parser.py b/checkov/terraform/tf_parser.py index e0e944092d5..15501a2adf7 100644 --- a/checkov/terraform/tf_parser.py +++ b/checkov/terraform/tf_parser.py @@ -5,7 +5,7 @@ import os from collections import defaultdict from pathlib import Path -from typing import Optional, Dict, Mapping, Set, Tuple, Callable, Any, List, cast, TYPE_CHECKING +from typing import Optional, Dict, Mapping, Set, Tuple, Callable, Any, List, cast, TYPE_CHECKING, overload import deep_merge import hcl2 @@ -429,12 +429,30 @@ def get_idx_by_module_name(module_data_list: list[dict[str, Any]], module_name: return None + @overload def parse_hcl_module_from_tf_definitions( self, - tf_definitions: Dict[TFDefinitionKey, Dict[str, Any]], + tf_definitions: dict[str, dict[str, Any]], source_dir: str, source: str, - ) -> Tuple[Module, Dict[TFDefinitionKey, Dict[str, Any]]]: + ) -> tuple[Module, dict[str, dict[str, Any]]]: + ... + + @overload + def parse_hcl_module_from_tf_definitions( + self, + tf_definitions: dict[TFDefinitionKey, dict[str, Any]], + source_dir: str, + source: str, + ) -> tuple[Module, dict[TFDefinitionKey, dict[str, Any]]]: + ... + + def parse_hcl_module_from_tf_definitions( + self, + tf_definitions: dict[str, dict[str, Any]] | dict[TFDefinitionKey, dict[str, Any]], + source_dir: str, + source: str, + ) -> tuple[Module, dict[str, dict[str, Any]] | dict[TFDefinitionKey, dict[str, Any]]]: module = self.get_new_module( source_dir=source_dir, external_modules_source_map=self.external_modules_source_map, diff --git a/checkov/terraform_json/runner.py b/checkov/terraform_json/runner.py index 1519988750c..b90540ce616 100644 --- a/checkov/terraform_json/runner.py +++ b/checkov/terraform_json/runner.py @@ -51,8 +51,9 @@ def __init__( self.file_extensions = TF_JSON_POSSIBLE_FILE_ENDINGS # override what gets set from the TF runner self.graph_registry = get_graph_checks_registry(super().check_type) - self.definitions: dict[str, dict[str, Any]] = {} - self.context: dict[str, dict[str, Any]] = {} + self.definitions: dict[str, dict[str, Any]] = {} # type:ignore[assignment] # need to check, how to support subclass differences + self.definitions_raw: "dict[str, list[tuple[int, str]]]" = {} + self.context: dict[str, dict[str, Any]] = {} # type:ignore[assignment] self.root_folder: str | None = None def run( @@ -177,7 +178,7 @@ def add_graph_check_results(self, report: Report, runner_filter: RunnerFilter) - record.set_guideline(guideline=check.guideline) report.add_record(record=record) - def run_block( + def run_block( # type:ignore[override] # would probably need to make 'TerraformRunner' generic self, entities: list[dict[str, Any]], definition_context: dict[str, Any], diff --git a/mypy.ini b/mypy.ini index 3d5acb6dead..02290e54232 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,7 +2,7 @@ mypy_path = extra_stubs files = checkov -exclude = checkov/(arm/checks|cloudformation/checks|kubernetes/checks|serverless|terraform/(checks|parser.py|plan_runner.py|runner.py)) +exclude = checkov/(arm/checks|cloudformation/checks|kubernetes/checks|serverless|terraform/(checks|plan_runner.py)) strict = True disallow_subclassing_any = False implicit_reexport = True @@ -26,9 +26,6 @@ ignore_missing_imports = True [mypy-networkx.*] ignore_missing_imports = True -[mypy-spdx.*] -ignore_missing_imports = True - [mypy-license_expression.*] ignore_missing_imports = True diff --git a/pyproject.toml b/pyproject.toml index 54e3e041e81..77b73ffb00f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,8 @@ +[tool.bandit] +exclude_dirs = [ + "tests" +] + [tool.black] line-length = 120 @@ -119,3 +124,6 @@ source_modules = [ forbidden_modules = [ "checkov.arm", ] + +[tool.pytest.ini_options] +addopts = "-n 2 --dist loadfile" diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index acc2bef60a1..00000000000 --- a/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -addopts = -n 2 --dist loadfile diff --git a/tests/terraform/runner/test_runner.py b/tests/terraform/runner/test_runner.py index c3103e2ec41..726d71c3d2d 100644 --- a/tests/terraform/runner/test_runner.py +++ b/tests/terraform/runner/test_runner.py @@ -1058,7 +1058,7 @@ def test_module_skip(self): assert record.caller_file_path == "/main.tf" # ATTENTION!! If this breaks, see the "HACK ALERT" comment in runner.run_block. # A bug might have been fixed. - self.assertEqual(record.caller_file_line_range, [6, 8]) + self.assertEqual(record.caller_file_line_range, (6, 8)) if "outside" in record.resource: found_outside = True @@ -1112,7 +1112,7 @@ def test_module_failure_reporting_772(self): assert record.file_path == "/module/module.tf" self.assertEqual(record.file_line_range, [7, 13]) assert record.caller_file_path == "/main.tf" - self.assertEqual(record.caller_file_line_range, [6, 8]) + self.assertEqual(record.caller_file_line_range, (6, 8)) self.assertTrue(found_inside) self.assertTrue(found_outside)