Skip to content

Commit

Permalink
Improve the entropy keyword combinator secret scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
ChanochShayner committed Jul 9, 2023
1 parent e4f3fa1 commit 6f38367
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 11 deletions.
14 changes: 6 additions & 8 deletions checkov/secrets/plugins/detector_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@
flags=re.IGNORECASE,
)

ALLOW_LIST = ('secretsmanager', "secretName") # can add more keys like that
ALLOW_LIST = ('secretsmanager', "secretName", "secret_name", "creation_token") # can add more keys like that
ALLOW_LIST_REGEX = r'|'.join(ALLOW_LIST)
# Support for suffix of function name i.e "secretsmanager:GetSecretValue"
CAMEL_CASE_NAMES = r'[A-Z]([A-Z0-9]*[a-z][a-z0-9]*[A-Z]|[a-z0-9]*[A-Z][A-Z0-9]*[a-z])[A-Za-z0-9]*'
FUNCTION_CALL_AFTER_KEYWORD_REGEX = re.compile(r'({allowlist}):\s*{suffix}'.format(
FUNCTION_CALL_AFTER_KEYWORD_REGEX = re.compile(r'({allowlist})\s*(:|=)\s*{suffix}'.format(
allowlist=ALLOW_LIST_REGEX,
suffix=AFFIX_REGEX,
))
Expand Down Expand Up @@ -178,13 +178,11 @@ def remove_fp_secrets_in_keys(detected_secrets: set[PotentialSecret], line: str)
formatted_line = line.replace('"', '').replace("'", '')
secrets_to_remove = set()
for detected_secret in detected_secrets:
if detected_secret.secret_value and formatted_line.startswith(
detected_secret.secret_value):
# Found keyword prefix as potential secret
# Found keyword prefix as potential secret
if detected_secret.secret_value and formatted_line.startswith(detected_secret.secret_value):
secrets_to_remove.add(detected_secret)
if detected_secret.secret_value and formatted_line and \
FUNCTION_CALL_AFTER_KEYWORD_REGEX.search(formatted_line):
# found a function name at the end of the line
# found a function name at the end of the line
if detected_secret.secret_value and formatted_line and FUNCTION_CALL_AFTER_KEYWORD_REGEX.search(formatted_line):
secrets_to_remove.add(detected_secret)
detected_secrets -= secrets_to_remove

Expand Down
10 changes: 7 additions & 3 deletions checkov/secrets/plugins/entropy_keyword_combinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
MAX_LINE_LENGTH = 10000
MAX_KEYWORD_LIMIT = 500
ENTROPY_KEYWORD_COMBINATOR_LIMIT = 3
ENTROPY_KEYWORD_LIMIT = 4.5
ENTROPY_KEYWORD_LIMIT = 4.8

DENY_LIST_REGEX = r'|'.join(DENYLIST)
# Support for suffix after keyword i.e. password_secure = "value"
Expand Down Expand Up @@ -121,6 +121,7 @@ class EntropyKeywordCombinator(BasePlugin):
def __init__(self, limit: float = ENTROPY_KEYWORD_LIMIT, max_line_length: int = MAX_LINE_LENGTH) -> None:
iac_limit = ENTROPY_KEYWORD_COMBINATOR_LIMIT
self.high_entropy_scanners_iac = (Base64HighEntropyString(limit=iac_limit), HexHighEntropyString(limit=iac_limit))
self.entropy_scanners_non_iac_with_keyword = (Base64HighEntropyString(limit=iac_limit + 0.3), HexHighEntropyString(limit=iac_limit + 0.3))
self.high_entropy_scanners = (Base64HighEntropyString(limit=limit), HexHighEntropyString(limit=limit))
self.keyword_scanner = KeywordDetector()
self.max_line_length = max_line_length
Expand Down Expand Up @@ -153,15 +154,18 @@ def analyze_line(
if single_line_parser:
# Getting last detected one as only 1 violation available for line
secret_value, quoted_secret = EntropyKeywordCombinator.receive_last_secret_detected(keyword_on_key)
old_line = line
line = quoted_secret if quoted_secret else line
return single_line_parser.detect_secret(
detected_secrets = single_line_parser.detect_secret(
scanners=self.high_entropy_scanners_iac,
filename=filename,
raw_context=raw_context,
line=line,
line_number=line_number,
kwargs=kwargs
)
remove_fp_secrets_in_keys(detected_secrets, old_line)
return detected_secrets
else:
# preprocess line before detecting secrets - add quotes on potential secrets to allow triggering
# entropy detector
Expand Down Expand Up @@ -207,7 +211,7 @@ def analyze_line(
else:
return detect_secret(
# If we found a keyword (i.e. db_pass = ), lower the threshold to the iac threshold
scanners=self.high_entropy_scanners if not keyword_on_key else self.high_entropy_scanners_iac,
scanners=self.high_entropy_scanners if not keyword_on_key else self.entropy_scanners_non_iac_with_keyword,
filename=filename,
line=line,
line_number=line_number,
Expand Down
22 changes: 22 additions & 0 deletions tests/secrets/sanity/iac_fp/a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
test_files_dir = current_dir + "/example_LustreFSEncryptedWithCMK"

mock_url = mock_bc_integration.bc_api_url + "/api/v1/vulnerabilities/scan-results/2e97f5afea42664309f492a1e2083b43479c2936"

return "Properties/LogPublishingOptions/AUDIT_LOGS/Enabled"

if metadata_options['HttpTokens'] == "required":

"MDEyOk9yZ2FuaXphdGlvbjE\u003d"

'JSON Web Token': 'CKV_SECRET_9'

IAM_ROLE_STATEMENTS_TOKEN = 'iamRoleStatements'

'NPM tokens': 'CKV_SECRET_12'
'Slack Token': 'CKV_SECRET_14'
'SoftLayer Credentials': 'CKV_SECRET_15'
'Square OAuth Secret': 'CKV_SECRET_16'

self.go("GCPMySQLdbInstancePoint_In_TimeRecoveryBackupIsEnabled")

's3_origin_config': [{'origin_access_identity': ['origin-access-identity/cloudfront/ABCDEFG1234567']}]}]
3 changes: 3 additions & 0 deletions tests/secrets/sanity/non_iac_fp/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
secret_name = "example_secret_name"

creation_token = "my-product"
24 changes: 24 additions & 0 deletions tests/secrets/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,30 @@ def test_sanity_check_secrets(self):
self.assertEqual(report.skipped_checks, [])
report.print_console()

def test_fp_sanity_check_secrets_non_iac(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/sanity/iac_fp"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET_6'], enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 0)
self.assertEqual(report.parsing_errors, [])
self.assertEqual(report.passed_checks, [])
self.assertEqual(report.skipped_checks, [])
report.print_console()

def test_fp_sanity_check_secrets_iac(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/sanity/non_iac_fp"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET_6'], enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 0)
self.assertEqual(report.parsing_errors, [])
self.assertEqual(report.passed_checks, [])
self.assertEqual(report.skipped_checks, [])
report.print_console()

def test_sanity_check_non_secrets(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/sanity/non_secrets"
Expand Down

0 comments on commit 6f38367

Please sign in to comment.