From edccb7c402d7b36ec520f98ee6500ace74ce8a24 Mon Sep 17 00:00:00 2001 From: Roman Babenko Date: Mon, 19 Jun 2023 15:57:39 +0300 Subject: [PATCH 1/2] [refactoring] 2023-06-19T15:57:39+03:00 --- credsweeper/rules/rule.py | 16 ++++ credsweeper/scanner/scanner.py | 162 +++++++++++++++------------------ tests/conftest.py | 6 +- 3 files changed, 93 insertions(+), 91 deletions(-) diff --git a/credsweeper/rules/rule.py b/credsweeper/rules/rule.py index 4a771f46b..d80b6c1ef 100644 --- a/credsweeper/rules/rule.py +++ b/credsweeper/rules/rule.py @@ -53,6 +53,7 @@ class Rule: # auxiliary fields USE_ML = "use_ml" REQUIRED_SUBSTRINGS = "required_substrings" + REQUIRED_REGEX = "required_regex" VALIDATIONS = "validations" def __init__(self, config: Config, rule_dict: Dict) -> None: @@ -75,6 +76,11 @@ def __init__(self, config: Config, rule_dict: Dict) -> None: self.__use_ml = bool(rule_dict.get(Rule.USE_ML)) self.__validations = self._get_validations(rule_dict.get(Rule.VALIDATIONS)) self.__required_substrings = [i.strip().lower() for i in rule_dict.get(Rule.REQUIRED_SUBSTRINGS, [])] + self.__has_required_substrings = bool(self.__required_substrings) + required_regex = rule_dict.get(Rule.REQUIRED_REGEX) + if required_regex and not isinstance(required_regex, str): + self._malformed_rule_error(rule_dict, Rule.REQUIRED_REGEX) + self.__required_regex = regex.compile(required_regex) if required_regex else None self.__min_line_len = int(rule_dict.get(Rule.MIN_LINE_LEN, MAX_LINE_LENGTH)) self.__usage_list: List[str] = rule_dict.get(Rule.USAGE_LIST, []) @@ -254,6 +260,16 @@ def required_substrings(self) -> List[str]: """required_substrings getter""" return self.__required_substrings + @cached_property + def has_required_substrings(self) -> bool: + """has_required_substrings getter for speedup""" + return self.__has_required_substrings + + @cached_property + def required_regex(self) -> Optional[re.Pattern]: + """required_regex getter""" + return self.__required_regex + @cached_property def min_line_len(self) -> int: """min_line_len getter""" diff --git a/credsweeper/scanner/scanner.py b/credsweeper/scanner/scanner.py index 530d8e169..5052d6616 100644 --- a/credsweeper/scanner/scanner.py +++ b/credsweeper/scanner/scanner.py @@ -1,10 +1,11 @@ import logging +import re from pathlib import Path -from typing import List, Optional, Type, Tuple, Dict, Union +from typing import List, Optional, Type, Tuple, Union, Dict from credsweeper.app import APP_PATH from credsweeper.common.constants import RuleType, MIN_VARIABLE_LENGTH, MIN_SEPARATOR_LENGTH, MIN_VALUE_LENGTH, \ - MAX_LINE_LENGTH, Separator + MAX_LINE_LENGTH from credsweeper.config import Config from credsweeper.credentials import Candidate from credsweeper.file_handler.analysis_target import AnalysisTarget @@ -31,17 +32,16 @@ class Scanner: def __init__(self, config: Config, rule_path: Optional[str], usage_list: Optional[List[str]] = None) -> None: self.config = config - self.__scanner_for_rule: Dict[str, Type[ScanType]] = {} - self.rules: List[Rule] = [] # init with MAX_LINE_LENGTH before _set_rules self.min_keyword_len = MAX_LINE_LENGTH self.min_pattern_len = MAX_LINE_LENGTH self.min_pem_key_len = MAX_LINE_LENGTH - self._set_rules(rule_path, usage_list if isinstance(usage_list, list) else ["src", "doc"]) + self.rules_scanners: List[Tuple[Rule, Type[ScanType]]] = [] + self._set_rules_scanners(rule_path, usage_list if isinstance(usage_list, list) else ["src", "doc"]) self.min_len = min(self.min_pattern_len, self.min_keyword_len, self.min_pem_key_len, MIN_VARIABLE_LENGTH + MIN_SEPARATOR_LENGTH + MIN_VALUE_LENGTH) - def _set_rules(self, rule_path: Union[None, str, Path], usage_list: List[str]) -> None: + def _set_rules_scanners(self, rule_path: Union[None, str, Path], usage_list: List[str]) -> None: """Auxiliary method to fill rules, determine min_pattern_len and set scanners""" if rule_path is None: rule_path = APP_PATH / "rules" / "config.yaml" @@ -51,7 +51,6 @@ def _set_rules(self, rule_path: Union[None, str, Path], usage_list: List[str]) - rule = Rule(self.config, rule_template) if not self._is_available(usage_list, rule): continue - self.rules.append(rule) if 0 < rule.min_line_len: if rule.rule_type == RuleType.KEYWORD: self.min_keyword_len = min(self.min_keyword_len, rule.min_line_len) @@ -61,52 +60,10 @@ def _set_rules(self, rule_path: Union[None, str, Path], usage_list: List[str]) - self.min_pem_key_len = min(self.min_pem_key_len, rule.min_line_len) else: logger.warning(f"Unknown rule type:{rule.rule_type}") - self.__scanner_for_rule[rule.rule_name] = self.get_scanner(rule) + self.rules_scanners.append((rule, self.get_scanner(rule))) else: raise RuntimeError(f"Wrong rules '{rule_templates}' were read from '{rule_path}'") - def _select_and_group_targets(self, targets: List[AnalysisTarget]) -> Tuple[TargetGroup, TargetGroup, TargetGroup]: - """Group targets into 3 lists based on loaded rules. - - Args: - targets: List of AnalysisTarget to analyze - - Return: - Three TargetGroup objects: one for keywords, one for patterns, and one for PEM keys - - """ - keyword_targets = [] - pattern_targets = [] - pem_targets = [] - - for target in targets: - # Ignore target if it's too long - if target.line_len > MAX_LINE_LENGTH: - logger.warning(f"Skipped oversize({target.line_len}) line in {target.file_path}:{target.line_num}", ) - continue - # Trim string from outer spaces to make future `a in str` checks faster - target_line_trimmed = target.line.strip() - target_line_trimmed_len = len(target_line_trimmed) - # Ignore target if trimmed part is too short - if target_line_trimmed_len < self.min_len: - continue - target_line_trimmed_lower = target_line_trimmed.lower() - # check minimal length for keyword rule - if target_line_trimmed_len >= self.min_keyword_len: - # Check if have at least one separator character. Otherwise cannot be matched by a keyword - for x in Separator.common_as_set: - if x in target_line_trimmed: - keyword_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len)) - break - # Check if have length not smaller than smallest `min_line_len` in all pattern rules - if target_line_trimmed_len >= self.min_pattern_len: - pattern_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len)) - # Check if have "BEGIN" substring. Cannot otherwise ba matched as a PEM key - if target_line_trimmed_len >= self.min_pem_key_len and "BEGIN" in target_line_trimmed: - pem_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len)) - - return keyword_targets, pattern_targets, pem_targets - def _is_available(self, usage_list: List[str], rule: Rule) -> bool: """separate the method to reduce complexity""" if rule.severity < self.config.severity: @@ -117,13 +74,20 @@ def _is_available(self, usage_list: List[str], rule: Rule) -> bool: return False @staticmethod - def _required_substrings_not_present(required_substrings: List[str], line: str): + def _required_substrings_not_present(required_substrings: List[str], line: str) -> bool: """ returns True if required substring absent in line """ for substring in required_substrings: if substring in line: return False return True + @staticmethod + def _required_regex_not_matched(required_regex: re.Pattern, line: str) -> bool: + """ returns True if line does not matched required_regex """ + if required_regex.match(line): + return False + return True + def scan(self, targets: List[AnalysisTarget]) -> List[Candidate]: """Run scanning of list of target lines from 'targets' with set of rule from 'self.rules'. @@ -139,22 +103,68 @@ def scan(self, targets: List[AnalysisTarget]) -> List[Candidate]: if not targets: # optimization for empty list return credentials - keyword_targets, pattern_targets, pem_targets = self._select_and_group_targets(targets) - for rule in self.rules: - min_line_len = rule.min_line_len - required_substrings = rule.required_substrings - scanner = self.__scanner_for_rule[rule.rule_name] - to_check = self.get_targets_to_check(keyword_targets, pattern_targets, pem_targets, rule) - # It is almost two times faster to pre-compute values related to target_line than to compute them in - # each iteration - for target, target_line_trimmed_lower, target_line_trimmed_len in to_check: - if target_line_trimmed_len < min_line_len or required_substrings \ - and self._required_substrings_not_present(required_substrings, target_line_trimmed_lower): - continue + + for target in targets: + # Ignore target if it's too long + line_len = len(target.line) + if MAX_LINE_LENGTH < line_len: + logger.warning(f"Skipped oversize({line_len}) line in {target.file_path}:{target.line_num}") + continue + + # Trim string from outer spaces to make future `x in str` checks faster + target_line_stripped = target.line.strip() + target_line_stripped_len = len(target_line_stripped) + # Ignore target if stripped part is too short for all types + if target_line_stripped_len < self.min_len: + # the target does not satisfy any pattern type of all rules + continue + + # "cache" - YAPF and pycharm formatters ... + matched_keyword = \ + target_line_stripped_len >= self.min_keyword_len and ( # + '=' in target_line_stripped or ':' in target_line_stripped) # + matched_pem_key = \ + target_line_stripped_len >= self.min_pem_key_len and "-----BEGIN" in target_line_stripped \ + and "PRIVATE" in target_line_stripped + matched_pattern = target_line_stripped_len >= self.min_pattern_len + + if not (matched_keyword or matched_pem_key or matched_pattern): + continue + + # use lower case for required substring + target_line_stripped_lower = target_line_stripped.lower() + # cached value to skip the same regex verifying + matched_regex: Dict[regex.Pattern, bool] = {} + + for rule, scanner in [ # + (rule, scanner) for (rule, scanner) in self.rules_scanners \ + if target_line_stripped_len >= rule.min_line_len and ( # + RuleType.PATTERN == rule.rule_type and matched_pattern # + or RuleType.KEYWORD == rule.rule_type and matched_keyword # + or RuleType.PEM_KEY == rule.rule_type and matched_pem_key # + ) # + ]: + for substring in rule.required_substrings: + if substring in target_line_stripped_lower: + break + else: + if rule.has_required_substrings: + continue + + # common regex might be triggered for the same target + if rule.required_regex: + if rule.required_regex in matched_regex: + regex_result = matched_regex[rule.required_regex] + else: + regex_result = bool(rule.required_regex.search(target.line)) + matched_regex[rule.required_regex] = regex_result + if not regex_result: + continue + if new_credential := scanner.run(self.config, rule, target): + credentials.append(new_credential) logger.debug("Credential for rule: %s in file: %s:%d in line: %s", rule.rule_name, target.file_path, target.line_num, target.line) - credentials.append(new_credential) return credentials @staticmethod @@ -175,27 +185,3 @@ def get_scanner(rule: Rule) -> Type[ScanType]: elif rule.pattern_type == Rule.PEM_KEY_PATTERN: return PemKeyPattern raise ValueError(f"Unknown pattern_type in rule: {rule.pattern_type}") - - @staticmethod - def get_targets_to_check(keyword_targets: TargetGroup, pattern_targets: TargetGroup, pem_targets: TargetGroup, - rule: Rule) -> TargetGroup: - """Choose target subset based on a rule. - - Args: - keyword_targets: TargetGroup with targets relevant to a keyword based rules - pattern_targets: TargetGroup with targets relevant to a pattern based rules - pem_targets: TargetGroup with targets relevant to a pem key rules - rule: rule object used to scanning - - Return: - depending on the rule type, returns one of the other arguments - - """ - if rule.rule_type == RuleType.KEYWORD: - return keyword_targets - elif rule.rule_type == RuleType.PATTERN: - return pattern_targets - elif rule.rule_type == RuleType.PEM_KEY: - return pem_targets - else: - raise ValueError(f"Unknown RuleType {rule.rule_type}") diff --git a/tests/conftest.py b/tests/conftest.py index 520c2d3a4..a1fbbdd37 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,7 +50,7 @@ def config() -> Config: @pytest.fixture def rule(rule_name: str, config: Config, rule_path: str) -> Optional[Rule]: scanner = Scanner(config, rule_path) - for rule in scanner.rules: + for rule, scanner in scanner.rules_scanners: if rule.rule_name == rule_name: return rule return None @@ -64,7 +64,7 @@ def rule_path() -> str: @pytest.fixture def scanner(rule: Rule, config: Config, rule_path: str) -> Scanner: scanner = Scanner(config, rule_path) - scanner.rules = [rule] + scanner.rules_scanners = [(rule, Scanner.get_scanner(rule))] return scanner @@ -72,5 +72,5 @@ def scanner(rule: Rule, config: Config, rule_path: str) -> Scanner: def scanner_without_filters(rule: Rule, config: Config, rule_path: str): config.use_filters = False scanner = Scanner(config, rule_path) - scanner.rules = [rule] + scanner.rules_scanners = [(rule, Scanner.get_scanner(rule))] return scanner From 03043bdcd0bcf0ca357453f9796966c8faec924e Mon Sep 17 00:00:00 2001 From: Roman Babenko Date: Mon, 19 Jun 2023 18:08:28 +0300 Subject: [PATCH 2/2] fix --- credsweeper/rules/rule.py | 2 +- credsweeper/scanner/scanner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/credsweeper/rules/rule.py b/credsweeper/rules/rule.py index d80b6c1ef..b9b17bfea 100644 --- a/credsweeper/rules/rule.py +++ b/credsweeper/rules/rule.py @@ -80,7 +80,7 @@ def __init__(self, config: Config, rule_dict: Dict) -> None: required_regex = rule_dict.get(Rule.REQUIRED_REGEX) if required_regex and not isinstance(required_regex, str): self._malformed_rule_error(rule_dict, Rule.REQUIRED_REGEX) - self.__required_regex = regex.compile(required_regex) if required_regex else None + self.__required_regex = re.compile(required_regex) if required_regex else None self.__min_line_len = int(rule_dict.get(Rule.MIN_LINE_LEN, MAX_LINE_LENGTH)) self.__usage_list: List[str] = rule_dict.get(Rule.USAGE_LIST, []) diff --git a/credsweeper/scanner/scanner.py b/credsweeper/scanner/scanner.py index 5052d6616..564c2d1e0 100644 --- a/credsweeper/scanner/scanner.py +++ b/credsweeper/scanner/scanner.py @@ -134,7 +134,7 @@ def scan(self, targets: List[AnalysisTarget]) -> List[Candidate]: # use lower case for required substring target_line_stripped_lower = target_line_stripped.lower() # cached value to skip the same regex verifying - matched_regex: Dict[regex.Pattern, bool] = {} + matched_regex: Dict[re.Pattern, bool] = {} for rule, scanner in [ # (rule, scanner) for (rule, scanner) in self.rules_scanners \