diff --git a/checkov/common/resource_code_logger_filter.py b/checkov/common/resource_code_logger_filter.py new file mode 100644 index 00000000000..f1e3aa98b2a --- /dev/null +++ b/checkov/common/resource_code_logger_filter.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os +from logging import Logger, Filter, LogRecord + +from checkov.common.util.type_forcers import convert_str_to_bool + + +class ResourceCodeFilter(Filter): + """ + A custom logger filter designed to decide if we want to filter some logs from the default logger. + Could be used to reduce logs size. + First use case is to log without the actual code of resources, which takes a lot of the logs size. + The default is to log everything in order to keep api the same. + """ + CODE_TEMPLATES: list[str] = [] + + def __init__(self, allow_code_logging: bool = True): + super().__init__() + self.allow_code_logging = allow_code_logging + + def filter(self, record: LogRecord) -> bool: + if self.allow_code_logging: + return True + if hasattr(record, "mask"): + # Allows filtering using `logging.info("", extra={"mask": True})` + mask = record.mask + if not isinstance(mask, bool): + raise Exception(f"Expected to get `mask` as boolean for logging function, instead got: {mask} of type {type(mask)}") + return not record.mask + + msg = record.msg + return self._filter_based_on_msg(msg) + + def _filter_based_on_msg(self, msg: str) -> bool: + for code_template in ResourceCodeFilter.CODE_TEMPLATES: + if code_template in msg: + return False + return True + + +def add_resource_code_filter_to_logger(logger: Logger, allow_code_logging: bool | None = None) -> None: + if allow_code_logging is None: + allow_code_logging_res = convert_str_to_bool(os.environ.get("CHECKOV_ALLOW_CODE_LOGGING", True)) + if isinstance(allow_code_logging_res, bool): + allow_code_logging = allow_code_logging_res + else: + raise Exception(f"Failed to get correct result for env variable - `CHECKOV_ALLOW_CODE_LOGGING`. " + f"Got {allow_code_logging_res}") + + resource_code_filter = ResourceCodeFilter(allow_code_logging=allow_code_logging) + logger.addFilter(resource_code_filter) diff --git a/tests/common/test_resource_code_logger_filter.py b/tests/common/test_resource_code_logger_filter.py new file mode 100644 index 00000000000..3d1c478fd42 --- /dev/null +++ b/tests/common/test_resource_code_logger_filter.py @@ -0,0 +1,34 @@ +import logging +import mock + +from checkov.common.resource_code_logger_filter import add_resource_code_filter_to_logger + + +TEST_CODE_TEMPLATES_TO_REPLACE = "THIS-SHOULD-BE-REPLACED!" + + +def test_code_logger_filter_do_not_log_if_not_allowed(caplog) -> None: + with mock.patch("checkov.common.resource_code_logger_filter.ResourceCodeFilter.CODE_TEMPLATES", + [TEST_CODE_TEMPLATES_TO_REPLACE]): + logger = logging.getLogger("code logging not allowed") + add_resource_code_filter_to_logger(logger, allow_code_logging=False) + logger.warning(TEST_CODE_TEMPLATES_TO_REPLACE) + assert TEST_CODE_TEMPLATES_TO_REPLACE not in caplog.text + + +def test_code_logger_filter_logs_if_allowed(caplog) -> None: + with mock.patch("checkov.common.resource_code_logger_filter.ResourceCodeFilter.CODE_TEMPLATES", + [TEST_CODE_TEMPLATES_TO_REPLACE]): + logger = logging.getLogger("code logging allowed") + add_resource_code_filter_to_logger(logger) + logger.warning(TEST_CODE_TEMPLATES_TO_REPLACE) + assert TEST_CODE_TEMPLATES_TO_REPLACE in caplog.text + + +def test_code_logger_filter_logs_based_on_arg_not_allowed(caplog) -> None: + with mock.patch("checkov.common.resource_code_logger_filter.ResourceCodeFilter.CODE_TEMPLATES", + [TEST_CODE_TEMPLATES_TO_REPLACE]): + logger = logging.getLogger("code logging not allowed") + add_resource_code_filter_to_logger(logger, allow_code_logging=False) + logger.warning(TEST_CODE_TEMPLATES_TO_REPLACE, extra={"mask": True}) + assert TEST_CODE_TEMPLATES_TO_REPLACE not in caplog.text