Skip to content

Commit

Permalink
[skip actions] [optimization] 2023-07-16T13:21:46+03:00
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed Jul 16, 2023
1 parent 614416a commit 40490ae
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 54 deletions.
2 changes: 1 addition & 1 deletion credsweeper/credentials/candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def to_dict_list(self) -> List[dict]:
def get_dummy_candidate(cls, config: Config, file_path: str, file_type: str, info: str):
"""Create dummy instance to use in searching file by extension"""
return cls( #
line_data_list=[LineData(config, "dummy line", -1, file_path, file_type, info, re.compile(".*"))],
line_data_list=[LineData(config, "dummy line", -1,0, file_path, file_type, info, re.compile(".*"))],
patterns=[re.compile(".*")], #
rule_name="Dummy candidate", #
severity=Severity.INFO, #
Expand Down
12 changes: 12 additions & 0 deletions credsweeper/credentials/line_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
self, #
config: Config, #
line: str, #
line_pos: int, #
line_num: int, #
path: str, #
file_type: str, #
Expand All @@ -40,6 +41,7 @@ def __init__(
self.config = config
self.key: Optional[str] = None
self.line: str = line
self.line_pos: int = line_pos
self.line_num: int = line_num
self.path: str = path
self.file_type: str = file_type
Expand Down Expand Up @@ -80,6 +82,16 @@ def line_len(self) -> int:
"""line_len getter"""
return len(self.__line)

@property
def line_pos(self) -> int:
"""line_pos getter"""
return self.__line_pos

@line_pos.setter
def line_pos(self, line_pos: int) -> None:
"""line_pos setter"""
self.__line_pos = line_pos

@property
def line_num(self) -> int:
"""line_num getter"""
Expand Down
61 changes: 50 additions & 11 deletions credsweeper/file_handler/analysis_target.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,75 @@
from dataclasses import dataclass
from functools import cached_property
from typing import List, Optional

from credsweeper.file_handler.descriptor import Descriptor


@dataclass(frozen=True)
class AnalysisTarget:
"""AnalysisTarget"""
line: str
line_num: int
lines: List[str]
descriptor: Descriptor

def __init__(self,
line_pos: int,
lines: List[str],
line_nums: List[int],
descriptor: Descriptor,
):
self.__line_pos = line_pos
self.__lines = lines
self.__line_nums = line_nums
self.__descriptor = descriptor

@cached_property
def line(self) -> str:
"""cached value"""
return self.__lines[self.__line_pos]

@cached_property
def line_len(self) -> int:
"""cached value"""
return len(self.line)
# use indirectly "line" to prevent caching skipped line
return len(self.__lines[self.__line_pos])

@cached_property
def lines(self) -> List[str]:
"""cached value"""
return self.__lines

@cached_property
def lines_len(self) -> int:
"""cached value"""
return len(self.lines)
return len(self.__lines)

@cached_property
def line_pos(self) -> int:
"""cached value"""
return self.__line_pos

@cached_property
def line_num(self) -> int:
"""cached value"""
return self.__line_nums[self.__line_pos]

@cached_property
def line_nums(self) -> List[int]:
"""cached value"""
return self.__line_nums

@cached_property
def file_path(self) -> Optional[str]:
return self.descriptor.path
"""cached value"""
return self.__descriptor.path

@cached_property
def file_type(self) -> Optional[str]:
return self.descriptor.extension
"""cached value"""
return self.__descriptor.extension

@cached_property
def info(self) -> Optional[str]:
return self.descriptor.info
"""cached value"""
return self.__descriptor.info

@cached_property
def descriptor(self) -> Descriptor:
"""cached value"""
return self.__descriptor
16 changes: 11 additions & 5 deletions credsweeper/file_handler/content_provider.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from abc import ABC, abstractmethod
from functools import cached_property
from typing import List, Optional
Expand All @@ -6,6 +7,8 @@
from credsweeper.file_handler.descriptor import Descriptor
from credsweeper.utils import Util

logger = logging.getLogger(__name__)


class ContentProvider(ABC):
"""Base class to provide access to analysis targets for scanned object."""
Expand Down Expand Up @@ -72,12 +75,15 @@ def data(self, data: Optional[bytes]) -> None:
def lines_to_targets(self, lines: List[str], line_nums: Optional[List[int]] = None) -> List[AnalysisTarget]:
"""Creates list of targets with multiline concatenation"""
targets = []
if line_nums:
for line, line_num in zip(lines, line_nums):
target = AnalysisTarget(line, line_num, lines, self.descriptor)
if line_nums and len(line_nums) == len(lines):
for line_pos in range(len(lines)):
target = AnalysisTarget(line_pos, lines, line_nums, self.descriptor)
targets.append(target)
else:
for i, line in enumerate(lines):
target = AnalysisTarget(line, i + 1, lines, self.descriptor)
if line_nums and len(line_nums) != len(lines):
logger.warning(f"line numerations {len(line_nums)} does not match lines {len(lines)}")
_line_nums = [x for x in range(len(lines))]
for line_pos in range(len(lines)):
target = AnalysisTarget(line_pos, lines, _line_nums, self.descriptor)
targets.append(target)
return targets
14 changes: 5 additions & 9 deletions credsweeper/file_handler/diff_content_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,12 @@ def parse_lines_data(self, lines_data: List[DiffRowData]) -> Tuple[List[int], Li
in original order(replaced all lines not mentioned in diff file with blank line)
"""
max_line_numbs = max(x.line_numb for x in lines_data) if lines_data else 0
# fix case when whatthepatch parses wrong patch - some exceptions are possibly
max_line_numbs = max(max_line_numbs, len(lines_data))
all_lines = [""] * max_line_numbs
change_numbs = []
all_lines = []
for line_data in lines_data:
if line_data.line_type.value.startswith(self.change_type.value):
all_lines[line_data.line_numb - 1] = line_data.line
if line_data.line_type == self.change_type:
change_numbs.append(line_data.line_numb)
all_lines.append(line_data.line)
return change_numbs, all_lines

def get_analysis_target(self) -> List[AnalysisTarget]:
Expand All @@ -83,11 +79,11 @@ def get_analysis_target(self) -> List[AnalysisTarget]:
change_numbs, all_lines = self.parse_lines_data(lines_data)
return [
AnalysisTarget(
all_lines[l_numb - 1], #
l_numb, #
l_pos, #
all_lines, #
change_numbs, #
self.descriptor) #
for l_numb in change_numbs
for l_pos in range(len(change_numbs))
]
except Exception as exc:
logger.error(f"Wrong diff {type(exc)} {exc}")
Expand Down
4 changes: 2 additions & 2 deletions credsweeper/file_handler/string_content_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ def get_analysis_target(self) -> List[AnalysisTarget]:
"""
return [
AnalysisTarget(line, line_number, self.lines, self.descriptor)
for line_number, line in zip(self.line_numbers, self.lines)
AnalysisTarget(line_pos, self.lines, self.line_numbers, self.descriptor)
for line_pos in range(len(self.lines))
]
28 changes: 12 additions & 16 deletions credsweeper/scanner/scan_type/multi_pattern.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Optional

from credsweeper.common.constants import MAX_LINE_LENGTH
from credsweeper.config import Config
from credsweeper.credentials import Candidate
from credsweeper.file_handler.analysis_target import AnalysisTarget
Expand Down Expand Up @@ -39,16 +38,18 @@ def run(cls, config: Config, rule: Rule, target: AnalysisTarget) -> Optional[Can
if not candidate:
return None

line_num_margin = 1
line_pos_margin = 1

while line_num_margin <= cls.MAX_SEARCH_MARGIN:
if 1 <= candidate.line_data_list[0].line_num - line_num_margin <= len(target.lines):
if cls._scan(config, candidate, -line_num_margin, target, rule):
while line_pos_margin <= cls.MAX_SEARCH_MARGIN:
candi_line_pos_backward = candidate.line_data_list[0].line_pos - line_pos_margin
if 0 <= candi_line_pos_backward < target.lines_len:
if cls._scan(config, candidate, candi_line_pos_backward, target, rule):
break
if candidate.line_data_list[0].line_num + line_num_margin <= len(target.lines):
if cls._scan(config, candidate, line_num_margin, target, rule):
candi_line_pos_forward = candidate.line_data_list[0].line_pos + line_pos_margin
if candi_line_pos_forward < target.lines_len:
if cls._scan(config, candidate, candi_line_pos_forward, target, rule):
break
line_num_margin += 1
line_pos_margin += 1

# Check if found multi line
if len(candidate.line_data_list) == 1:
Expand All @@ -57,7 +58,7 @@ def run(cls, config: Config, rule: Rule, target: AnalysisTarget) -> Optional[Can
return candidate

@classmethod
def _scan(cls, config: Config, candidate: Candidate, line_num_margin: int, target: AnalysisTarget,
def _scan(cls, config: Config, candidate: Candidate, candi_line_pos: int, target: AnalysisTarget,
rule: Rule) -> bool:
"""Search for second part of multiline rule near the current line.
Expand All @@ -66,20 +67,15 @@ def _scan(cls, config: Config, candidate: Candidate, line_num_margin: int, targe
Args:
config: dict, scanner configuration
candidate: Current credential candidate detected in the line
line_num_margin: Number of lines around candidate to perform search
candi_line_pos: line position of lines around candidate to perform search
target: Analysis target
rule: Rule object to check current line. Should be a multi-pattern rule
Return:
Boolean. True if second part detected. False otherwise
"""
candi_line_num = candidate.line_data_list[0].line_num + line_num_margin
candi_line = target.lines[candi_line_num - 1]
if MAX_LINE_LENGTH < len(candi_line):
return False
# lines are not necessary - skip them
new_target = AnalysisTarget(candi_line, candi_line_num, [], target.descriptor)
new_target = AnalysisTarget(candi_line_pos, target.lines, target.line_nums, target.descriptor)
line_data = cls.get_line_data(config=config, target=new_target, pattern=rule.patterns[1], filters=rule.filters)

if line_data is None:
Expand Down
23 changes: 14 additions & 9 deletions credsweeper/scanner/scan_type/pem_key_pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,25 @@ def detect_pem_key(cls, config: Config, rule: Rule, target: AnalysisTarget) -> L
line_data: List[LineData] = []
key_data = ""
# get line with -----BEGIN which may contain full key
first_line = LineData(config, target.line, target.line_num, target.file_path, target.file_type, target.info,
first_line = LineData(config, target.line, target.line_pos, target.line_num, target.file_path, target.file_type,
target.info,
rule.patterns[0])
line_data.append(first_line)
# protection check for case when first line starts from 0
line_num = target.line_num if 0 < target.line_num else 1
finish_line = line_num + 200
for line in target.lines[line_num - 1:]:
if finish_line < line_num:
return []
if 1 != line_num and target.line_num != line_num:
_line = LineData(config, line, line_num, target.file_path, target.file_type, target.info,
start_pos = target.line_pos if 0 <= target.line_pos else 0
finish_pos = min(start_pos + 200, target.lines_len)
for line_pos in range(start_pos, finish_pos):
line = target.lines[line_pos]
if target.line_pos != line_pos:
_line = LineData(config, #
line, #
line_pos, #
target.line_nums[line_pos], #
target.file_path, #
target.file_type, #
target.info, #
cls.re_value_pem)
line_data.append(_line)
line_num += 1
# replace escaped line ends with real and process them - PEM does not contain '\' sign
sublines = line.replace("\\r", '\n').replace("\\n", '\n').splitlines()
for subline in sublines:
Expand Down
8 changes: 7 additions & 1 deletion credsweeper/scanner/scan_type/scan_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ def get_line_data(
return None
logger.debug("Valid line for pattern: %s in file: %s:%d in line: %s", pattern, target.file_path,
target.line_num, target.line)
line_data = LineData(config, target.line, target.line_num, target.file_path, target.file_type, target.info,
line_data = LineData(config, #
target.line, #
target.line_pos, #
target.line_num, #
target.file_path, #
target.file_type, #
target.info, #
pattern)

if cls.filtering(config, target, line_data, filters):
Expand Down

0 comments on commit 40490ae

Please sign in to comment.