Skip to content

Commit

Permalink
Merge branch 'refactoring' into optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed Jul 16, 2023
2 parents 40490ae + 03043bd commit 5dc1ba0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 90 deletions.
16 changes: 16 additions & 0 deletions credsweeper/rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Rule:
# auxiliary fields
USE_ML = "use_ml"
REQUIRED_SUBSTRINGS = "required_substrings"
REQUIRED_REGEX = "required_regex"
VALIDATIONS = "validations"

def __init__(self, config: Config, rule_dict: Dict) -> None:
Expand All @@ -75,6 +76,11 @@ def __init__(self, config: Config, rule_dict: Dict) -> None:
self.__use_ml = bool(rule_dict.get(Rule.USE_ML))
self.__validations = self._get_validations(rule_dict.get(Rule.VALIDATIONS))
self.__required_substrings = [i.strip().lower() for i in rule_dict.get(Rule.REQUIRED_SUBSTRINGS, [])]
self.__has_required_substrings = bool(self.__required_substrings)
required_regex = rule_dict.get(Rule.REQUIRED_REGEX)
if required_regex and not isinstance(required_regex, str):
self._malformed_rule_error(rule_dict, Rule.REQUIRED_REGEX)
self.__required_regex = re.compile(required_regex) if required_regex else None
self.__min_line_len = int(rule_dict.get(Rule.MIN_LINE_LEN, MAX_LINE_LENGTH))
self.__usage_list: List[str] = rule_dict.get(Rule.USAGE_LIST, [])

Expand Down Expand Up @@ -254,6 +260,16 @@ def required_substrings(self) -> List[str]:
"""required_substrings getter"""
return self.__required_substrings

@cached_property
def has_required_substrings(self) -> bool:
"""has_required_substrings getter for speedup"""
return self.__has_required_substrings

@cached_property
def required_regex(self) -> Optional[re.Pattern]:
"""required_regex getter"""
return self.__required_regex

@cached_property
def min_line_len(self) -> int:
"""min_line_len getter"""
Expand Down
160 changes: 73 additions & 87 deletions credsweeper/scanner/scanner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import re
from pathlib import Path
from typing import List, Optional, Type, Tuple, Dict, Union
from typing import List, Optional, Type, Tuple, Union, Dict

from credsweeper.app import APP_PATH
from credsweeper.common.constants import RuleType, MIN_VARIABLE_LENGTH, MIN_SEPARATOR_LENGTH, MIN_VALUE_LENGTH, \
Expand Down Expand Up @@ -31,17 +32,16 @@ class Scanner:

def __init__(self, config: Config, rule_path: Optional[str], usage_list: Optional[List[str]] = None) -> None:
self.config = config
self.__scanner_for_rule: Dict[str, Type[ScanType]] = {}
self.rules: List[Rule] = []
# init with MAX_LINE_LENGTH before _set_rules
self.min_keyword_len = MAX_LINE_LENGTH
self.min_pattern_len = MAX_LINE_LENGTH
self.min_pem_key_len = MAX_LINE_LENGTH
self._set_rules(rule_path, usage_list if isinstance(usage_list, list) else ["src", "doc"])
self.rules_scanners: List[Tuple[Rule, Type[ScanType]]] = []
self._set_rules_scanners(rule_path, usage_list if isinstance(usage_list, list) else ["src", "doc"])
self.min_len = min(self.min_pattern_len, self.min_keyword_len, self.min_pem_key_len,
MIN_VARIABLE_LENGTH + MIN_SEPARATOR_LENGTH + MIN_VALUE_LENGTH)

def _set_rules(self, rule_path: Union[None, str, Path], usage_list: List[str]) -> None:
def _set_rules_scanners(self, rule_path: Union[None, str, Path], usage_list: List[str]) -> None:
"""Auxiliary method to fill rules, determine min_pattern_len and set scanners"""
if rule_path is None:
rule_path = APP_PATH / "rules" / "config.yaml"
Expand All @@ -51,7 +51,6 @@ def _set_rules(self, rule_path: Union[None, str, Path], usage_list: List[str]) -
rule = Rule(self.config, rule_template)
if not self._is_available(usage_list, rule):
continue
self.rules.append(rule)
if 0 < rule.min_line_len:
if rule.rule_type == RuleType.KEYWORD:
self.min_keyword_len = min(self.min_keyword_len, rule.min_line_len)
Expand All @@ -61,52 +60,10 @@ def _set_rules(self, rule_path: Union[None, str, Path], usage_list: List[str]) -
self.min_pem_key_len = min(self.min_pem_key_len, rule.min_line_len)
else:
logger.warning(f"Unknown rule type:{rule.rule_type}")
self.__scanner_for_rule[rule.rule_name] = self.get_scanner(rule)
self.rules_scanners.append((rule, self.get_scanner(rule)))
else:
raise RuntimeError(f"Wrong rules '{rule_templates}' were read from '{rule_path}'")

def _select_and_group_targets(self, targets: List[AnalysisTarget]) -> Tuple[TargetGroup, TargetGroup, TargetGroup]:
"""Group targets into 3 lists based on loaded rules.
Args:
targets: List of AnalysisTarget to analyze
Return:
Three TargetGroup objects: one for keywords, one for patterns, and one for PEM keys
"""
keyword_targets = []
pattern_targets = []
pem_targets = []

for target in targets:
# Ignore target if it's too long
if target.line_len > MAX_LINE_LENGTH:
logger.warning(f"Skipped oversize({target.line_len}) line in {target.file_path}:{target.line_num}", )
continue
# Trim string from outer spaces to make future `a in str` checks faster
target_line_trimmed = target.line.strip()
target_line_trimmed_len = len(target_line_trimmed)
# Ignore target if trimmed part is too short
if target_line_trimmed_len < self.min_len:
continue
target_line_trimmed_lower = target_line_trimmed.lower()
# check minimal length for keyword rule
if target_line_trimmed_len >= self.min_keyword_len:
# Check if have at least one separator character. Otherwise cannot be matched by a keyword
for x in Separator.common_as_set:
if x in target_line_trimmed:
keyword_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len))
break
# Check if have length not smaller than smallest `min_line_len` in all pattern rules
if target_line_trimmed_len >= self.min_pattern_len:
pattern_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len))
# Check if have "BEGIN" substring. Cannot otherwise ba matched as a PEM key
if target_line_trimmed_len >= self.min_pem_key_len and PEM_BEGIN_PATTERN in target_line_trimmed:
pem_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len))

return keyword_targets, pattern_targets, pem_targets

def _is_available(self, usage_list: List[str], rule: Rule) -> bool:
"""separate the method to reduce complexity"""
if rule.severity < self.config.severity:
Expand All @@ -117,13 +74,20 @@ def _is_available(self, usage_list: List[str], rule: Rule) -> bool:
return False

@staticmethod
def _required_substrings_not_present(required_substrings: List[str], line: str):
def _required_substrings_not_present(required_substrings: List[str], line: str) -> bool:
""" returns True if required substring absent in line """
for substring in required_substrings:
if substring in line:
return False
return True

@staticmethod
def _required_regex_not_matched(required_regex: re.Pattern, line: str) -> bool:
""" returns True if line does not matched required_regex """
if required_regex.match(line):
return False
return True

def scan(self, targets: List[AnalysisTarget]) -> List[Candidate]:
"""Run scanning of list of target lines from 'targets' with set of rule from 'self.rules'.
Expand All @@ -139,22 +103,68 @@ def scan(self, targets: List[AnalysisTarget]) -> List[Candidate]:
if not targets:
# optimization for empty list
return credentials
keyword_targets, pattern_targets, pem_targets = self._select_and_group_targets(targets)
for rule in self.rules:
min_line_len = rule.min_line_len
required_substrings = rule.required_substrings
scanner = self.__scanner_for_rule[rule.rule_name]
to_check = self.get_targets_to_check(keyword_targets, pattern_targets, pem_targets, rule)
# It is almost two times faster to pre-compute values related to target_line than to compute them in
# each iteration
for target, target_line_trimmed_lower, target_line_trimmed_len in to_check:
if target_line_trimmed_len < min_line_len or required_substrings \
and self._required_substrings_not_present(required_substrings, target_line_trimmed_lower):
continue

for target in targets:
# Ignore target if it's too long
line_len = len(target.line)
if MAX_LINE_LENGTH < line_len:
logger.warning(f"Skipped oversize({line_len}) line in {target.file_path}:{target.line_num}")
continue

# Trim string from outer spaces to make future `x in str` checks faster
target_line_stripped = target.line.strip()
target_line_stripped_len = len(target_line_stripped)
# Ignore target if stripped part is too short for all types
if target_line_stripped_len < self.min_len:
# the target does not satisfy any pattern type of all rules
continue

# "cache" - YAPF and pycharm formatters ...
matched_keyword = \
target_line_stripped_len >= self.min_keyword_len and ( #
'=' in target_line_stripped or ':' in target_line_stripped) #
matched_pem_key = \
target_line_stripped_len >= self.min_pem_key_len and "-----BEGIN" in target_line_stripped \
and "PRIVATE" in target_line_stripped
matched_pattern = target_line_stripped_len >= self.min_pattern_len

if not (matched_keyword or matched_pem_key or matched_pattern):
continue

# use lower case for required substring
target_line_stripped_lower = target_line_stripped.lower()
# cached value to skip the same regex verifying
matched_regex: Dict[re.Pattern, bool] = {}

for rule, scanner in [ #
(rule, scanner) for (rule, scanner) in self.rules_scanners \
if target_line_stripped_len >= rule.min_line_len and ( #
RuleType.PATTERN == rule.rule_type and matched_pattern #
or RuleType.KEYWORD == rule.rule_type and matched_keyword #
or RuleType.PEM_KEY == rule.rule_type and matched_pem_key #
) #
]:
for substring in rule.required_substrings:
if substring in target_line_stripped_lower:
break
else:
if rule.has_required_substrings:
continue

# common regex might be triggered for the same target
if rule.required_regex:
if rule.required_regex in matched_regex:
regex_result = matched_regex[rule.required_regex]
else:
regex_result = bool(rule.required_regex.search(target.line))
matched_regex[rule.required_regex] = regex_result
if not regex_result:
continue

if new_credential := scanner.run(self.config, rule, target):
credentials.append(new_credential)
logger.debug("Credential for rule: %s in file: %s:%d in line: %s", rule.rule_name, target.file_path,
target.line_num, target.line)
credentials.append(new_credential)
return credentials

@staticmethod
Expand All @@ -175,27 +185,3 @@ def get_scanner(rule: Rule) -> Type[ScanType]:
elif rule.pattern_type == Rule.PEM_KEY_PATTERN:
return PemKeyPattern
raise ValueError(f"Unknown pattern_type in rule: {rule.pattern_type}")

@staticmethod
def get_targets_to_check(keyword_targets: TargetGroup, pattern_targets: TargetGroup, pem_targets: TargetGroup,
rule: Rule) -> TargetGroup:
"""Choose target subset based on a rule.
Args:
keyword_targets: TargetGroup with targets relevant to a keyword based rules
pattern_targets: TargetGroup with targets relevant to a pattern based rules
pem_targets: TargetGroup with targets relevant to a pem key rules
rule: rule object used to scanning
Return:
depending on the rule type, returns one of the other arguments
"""
if rule.rule_type == RuleType.KEYWORD:
return keyword_targets
elif rule.rule_type == RuleType.PATTERN:
return pattern_targets
elif rule.rule_type == RuleType.PEM_KEY:
return pem_targets
else:
raise ValueError(f"Unknown RuleType {rule.rule_type}")
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def config() -> Config:
@pytest.fixture
def rule(rule_name: str, config: Config, rule_path: str) -> Optional[Rule]:
scanner = Scanner(config, rule_path)
for rule in scanner.rules:
for rule, scanner in scanner.rules_scanners:
if rule.rule_name == rule_name:
return rule
return None
Expand All @@ -64,13 +64,13 @@ def rule_path() -> str:
@pytest.fixture
def scanner(rule: Rule, config: Config, rule_path: str) -> Scanner:
scanner = Scanner(config, rule_path)
scanner.rules = [rule]
scanner.rules_scanners = [(rule, Scanner.get_scanner(rule))]
return scanner


@pytest.fixture
def scanner_without_filters(rule: Rule, config: Config, rule_path: str):
config.use_filters = False
scanner = Scanner(config, rule_path)
scanner.rules = [rule]
scanner.rules_scanners = [(rule, Scanner.get_scanner(rule))]
return scanner

0 comments on commit 5dc1ba0

Please sign in to comment.