Skip to content

Commit

Permalink
softreset
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed May 30, 2024
1 parent ffe74dd commit 7759771
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 709 deletions.
1 change: 1 addition & 0 deletions credsweeper/filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from credsweeper.filters.value_ip_check import ValueIPCheck
from credsweeper.filters.value_jfrog_token_check import ValueJfrogTokenCheck
from credsweeper.filters.value_json_web_token_check import ValueJsonWebTokenCheck
from credsweeper.filters.value_jwt_check import ValueJwtCheck
from credsweeper.filters.value_last_word_check import ValueLastWordCheck
from credsweeper.filters.value_length_check import ValueLengthCheck
from credsweeper.filters.value_method_check import ValueMethodCheck
Expand Down
12 changes: 5 additions & 7 deletions credsweeper/filters/value_entropy_base64_check.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import math

from credsweeper.common.constants import Chars, ENTROPY_LIMIT_BASE64
from credsweeper.common.constants import Chars
from credsweeper.config import Config
from credsweeper.credentials import LineData
from credsweeper.file_handler.analysis_target import AnalysisTarget
Expand Down Expand Up @@ -45,14 +45,12 @@ def get_min_data_entropy(x: int) -> float:
y = 4.1
elif 32 == x:
y = 4.4
elif 12 <= x < 35:
elif 12 <= x < 32:
# logarithm base 2 - slow, but precise. Approximation does not exceed stdev
y = 0.77 * math.log2(x) + 0.62
elif 35 <= x < 60:
y = ENTROPY_LIMIT_BASE64
elif 60 <= x:
# the entropy grows slowly after 60
y = 5.0
elif 32 < x:
l2x = math.log2(x)
y = 0.001477 * l2x ** 4 - 0.036886 * l2x ** 3 + 0.244849 * l2x ** 2 * 0.318411 * l2x + 0.3932
else:
y = 0
return y
81 changes: 81 additions & 0 deletions credsweeper/filters/value_jwt_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import contextlib
import json
import math
import string
from typing import Any

from credsweeper.common.constants import Chars
from credsweeper.config import Config
from credsweeper.credentials import LineData
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.filters import Filter, ValueEntropyBase64Check
from credsweeper.utils import Util


class ValueJwtCheck(Filter):
"""JWT token check - simple"""
ASCII_NON_WHITESPACES = string.ascii_letters + string.digits + string.punctuation
JWT_KEYS = {"alg", "typ", "key", "password"}

def __init__(self, config: Config = None) -> None:
pass

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received token which might be A JSON.
Args:
line_data: credential candidate data
target: multiline target from which line data was obtained
Return:
True, when need to filter candidate and False if left
"""

if not line_data.value:
return True
probability = 0.0
with contextlib.suppress(Exception):
for part in line_data.value.split('.'):
if part.startswith("eyJ"):
decoded = Util.decode_base64(part, padding_safe=True, urlsafe_detect=True)
if part_data := json.loads(decoded):
probability += ValueJwtCheck.check_jwt_recursive(part_data)
else:
# broken jwt
break
elif part:
entropy = Util.get_shannon_entropy(part, Chars.BASE64URL_CHARS.value)
# JWT auxiliary parts contain encrypted data which have less entropy than random
len_part = len(part)
min_entropy = ValueEntropyBase64Check.get_min_data_entropy(len_part) - 1 / math.log(len_part)
if min_entropy < entropy:
probability += 0.5
else:
# all parts passed the test
return 0.5 > probability
return True

@staticmethod
def check_jwt_recursive(data: Any) -> float:
"""Recursive check for jwt is safe because jwt has no references in data structure"""
result = 0.0
if isinstance(data, list):
for i in data:
result += ValueJwtCheck.check_jwt_recursive(i)
elif isinstance(data, dict):
for k, v in data.items():
if k in ValueJwtCheck.JWT_KEYS:
result += 0.33
result += ValueJwtCheck.check_jwt_recursive(v)
elif isinstance(data, str) and 32 <= len(data):
len_data = len(data)
# encoded/encrypted values may have less entropy than random generated
min_entropy = ValueEntropyBase64Check.get_min_data_entropy(len_data) - 1 / math.log(len_data)
entropy = Util.get_shannon_entropy(data, ValueJwtCheck.ASCII_NON_WHITESPACES)
if min_entropy < entropy:
result = 0.5
else:
# float, integer, none aren`t analyzed
pass
return result
9 changes: 5 additions & 4 deletions credsweeper/rules/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,18 @@

- name: JSON Web Token
severity: medium
confidence: moderate
confidence: strong
type: pattern
values:
- (?<![.0-9A-Za-z_+-])(?P<value>eyJ[0-9A-Za-z_=-]{15,8000}([.0-9A-Za-z_=-]{1,8000})?)
filter_type: GeneralPattern
use_ml: true
- (?<![=\w/+-])(?P<value>eyJ[\w=-]{15,8000}(\.[=\w/+-]{16,8000}){,16})
filter_type:
- ValueJwtCheck
required_substrings:
- eyJ
min_line_len: 18
target:
- code
- doc

- name: MailChimp API Key
severity: high
Expand Down
112 changes: 61 additions & 51 deletions experiment/src/entropy_test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
#!/usr/bin/env python3

"""
The script is used in experiment to get statistical distribution of shanon entropy
of a line which was obtained with an encoding (base64, base32, etc.) from random generated bytes.
The result format is:
# size of encoded string: (mean of entropy, standard deviation)
"""

import base64
import random
import signal
import statistics
import string
import threading
import time
from datetime import datetime
from multiprocessing import Pool
from typing import Tuple, Dict

import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import curve_fit

from credsweeper.common.constants import Chars
from credsweeper.filters import ValueEntropyBase36Check
# from credsweeper.filters import ValueEntropyBase36Check
from credsweeper.utils import Util

random_data: str
Expand All @@ -28,9 +39,9 @@ def evaluate_avg(_args: Tuple[int, float, float]) -> Tuple[float, float]:
entropies = []
for x in range(ITERATIONS):
offset = x * size
# entropy = Util.get_shannon_entropy(random_data[offset:offset + size], Chars.BASE64_CHARS.value)
entropy = Util.get_shannon_entropy(random_data[offset:offset + size], Chars.BASE64_CHARS.value)
# entropy = Util.get_shannon_entropy(random_data[offset:offset + size], Chars.BASE36_CHARS.value)
entropy = Util.get_shannon_entropy(random_data[offset:offset + size], Chars.BASE32_CHARS.value)
# entropy = Util.get_shannon_entropy(random_data[offset:offset + size], Chars.BASE32_CHARS.value)
entropies.append(entropy)
avg = statistics.mean(entropies)
dvt = statistics.stdev(entropies, avg)
Expand All @@ -40,15 +51,15 @@ def evaluate_avg(_args: Tuple[int, float, float]) -> Tuple[float, float]:
return min_avg, min_dvt


if __name__ == "__main__":

stats: Dict[int, Tuple[float, float]] = {}
sizes = [12, 13, 15, 16, 17, 31, 32, 33]
def generate(start, end) -> Dict[int, Tuple[float, float]]:
stats: Dict[int, Tuple[float, float]] = {} # type: ignore
sizes = [x for x in range(start, end)]
global random_data
try:
for n in range(1000):
start_time = time.time()
rand_bytes = random.randbytes(int(8 * ITERATIONS * max(sizes) / 5))
random_data = base64.b32encode(rand_bytes).decode('ascii')
random_data = base64.b64encode(rand_bytes).decode('ascii')
# random_data = ''.join(
# [random.choice(string.digits + string.ascii_lowercase) for _ in range(ITERATIONS * max(sizes))])
_args = [(i, stats[i][0] if i in stats else 9.9, stats[i][1] if i in stats else 0.0) for i in sizes]
Expand All @@ -57,48 +68,47 @@ def evaluate_avg(_args: Tuple[int, float, float]) -> Tuple[float, float]:
with threading.Lock():
stats[_size] = _res
print(f"done {n} in {time.time() - start_time}", flush=True)
for k, v in stats.items():
print(f"{k} = {v}", flush=True)
except KeyboardInterrupt as exc:
print(exc)
finally:
print("===========================================================")
print("===========================================================", flush=True)
for k, v in stats.items():
print(f"{k} = {v}", flush=True)

# base32
# 12 = (3.2448401902687922, 0.2001867347580528)
# 13 = (3.3305754195719484, 0.1987638281794566)
# 15 = (3.4840904247691813, 0.192504685389475)
# 16 = (3.544861791803441, 0.184688685917545)
# 17 = (3.613827056321014, 0.18707867741897827)
# 31 = (4.15268463818445, 0.1486133074700339)
# 32 = (4.177896164672521, 0.1472328639816872)
# 33 = (4.197883981615083, 0.14735097649694248)

# base36
# 14 = (3.4457644517398167, 0.18990807349700253)
# 15 = (3.5260346505689992, 0.18114901125908447)
# 16 = (3.598032662269341, 0.1830565384431312)
# 17 = (3.659276363856176, 0.1856434289456263)
# 23 = (3.963851572519515, 0.16574824489877288)
# 24 = (4.00254984568254, 0.1623406588528336)
# 25 = (4.040134902813914, 0.158720524449059)
# 26 = (4.078098075953585, 0.15933209429031434)

# base64
# 15 = (3.6775207689256977, 0.15381412670043787)
# 16 = (3.7600552609204625, 0.15666871578775507)
# 17 = (3.835262182966267, 0.1514079815395568)
# 18 = (3.899273202112598, 0.15521615494595756)
# 19 = (3.9669074540527136, 0.15022181070460836)
# 20 = (4.026675938018028, 0.1477139960335224)
# 21 = (4.0844028599694155, 0.14611461336723608)
# 23 = (4.1880028531766245, 0.14668346833164134)
# 24 = (4.236982996273627, 0.14220068825454704)
# 25 = (4.283528241641759, 0.14323971561083385)
# 31 = (4.5121865964712535, 0.1393228408491736)
# 32 = (4.545556887485041, 0.13347416608982715)
# 33 = (4.576938427997454, 0.1300362152603773)
# 39 = (4.743676039379888, 0.13053505168803348)
# 40 = (4.76769110698625, 0.1307074052311964)
print(f"{k}: {v},", flush=True)
return stats


def log_model(x, k4, k3, k2, k1, k0):
return k4 * np.log2(x) ** 4 + k3 * np.log2(x) ** 3 + k2 * np.log2(x) ** 2 + k1 * np.log2(x) + k0


def solve(data: dict[int, Tuple[float, float]]):
d_list = list((x, y) for x, y in data.items())
d_list.sort(key=lambda x: (int(x[0])))

plt.figure()
x = [int(i[0]) for i in d_list]
y = [i[1][0] for i in d_list]
y_min = [i[1][0] - i[1][1] for i in d_list]
y_max = [i[1][0] + i[1][1] for i in d_list]
plt.plot(x, y, 'r-', lw=2, label='ent')
plt.plot(x, y_min, 'r:', lw=1, label='min')
plt.plot(x, y_max, 'r:', lw=1, label='max')

_y = np.array(y_min)
_x = np.array(x)

params, covariance = curve_fit(log_model, _x, _y)
print(params)
k4, k3, k2, k1, k0 = params
plt.plot(x, log_model(x, k4, k3, k2, k1, k0), 'b--', label='fit')

plt.grid(True)
plt.show()


if __name__ == "__main__":
data_file = "base64entr_12_1200.json" # [0.00147696 -0.03688593 0.24484864 0.31841099 0.39320007]
if not (_data := Util.json_load(data_file)):
_data = generate(12, 1200)
Util.json_dump(_data, data_file)
solve(_data)
12 changes: 6 additions & 6 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
NEGLIGIBLE_ML_THRESHOLD = 0.00001

# credentials count after scan
SAMPLES_CRED_COUNT: int = 410
SAMPLES_CRED_LINE_COUNT: int = 427
SAMPLES_CRED_COUNT: int = 403
SAMPLES_CRED_LINE_COUNT: int = 420

# credentials count after post-processing
SAMPLES_POST_CRED_COUNT: int = 378
SAMPLES_POST_CRED_COUNT: int = 371

# with option --doc
SAMPLES_IN_DOC = 404
SAMPLES_IN_DOC = 401

# archived credentials that are not found without --depth
SAMPLES_IN_DEEP_1 = SAMPLES_POST_CRED_COUNT + 21
SAMPLES_IN_DEEP_2 = SAMPLES_IN_DEEP_1 + 18
SAMPLES_IN_DEEP_1 = SAMPLES_POST_CRED_COUNT + 22
SAMPLES_IN_DEEP_2 = SAMPLES_IN_DEEP_1 + 17
SAMPLES_IN_DEEP_3 = SAMPLES_IN_DEEP_2 + 1

# well known string with all latin letters
Expand Down
Loading

0 comments on commit 7759771

Please sign in to comment.