Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade FCT Deobfuscator #31

Merged
merged 10 commits into from
Mar 3, 2024
95 changes: 79 additions & 16 deletions src/vipyr_deobf/cli.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,56 @@
import argparse
from typing import Callable, NoReturn, TextIO, TypeVar
import logging
import logging.config
from typing import Callable, NoReturn, TextIO, TypeVar, override

from .deobfuscators.hyperion import format_hyperion, deobf_hyperion
from .deobfuscators.lzmaspam import format_lzma_b64, deobf_lzma_b64
from .deobfuscators.vare import format_vare, deobf_vare
from .deobfuscators.not_pyobfuscate import format_not_pyobfuscate, deobf_not_pyobfuscate
from .deobfuscators.fct import deobf_fct, format_fct
from .deobfuscators.hyperion import deobf_hyperion, format_hyperion
from .deobfuscators.lzmaspam import deobf_lzma_b64, format_lzma_b64
from .deobfuscators.vare import deobf_vare, format_vare
from .exceptions import DeobfuscationFailError, InvalidSchemaError

R = TypeVar('R')

supported_obfuscators: dict[str, tuple[Callable[[TextIO], R], Callable[[R], str]]] = {
'hyperion': (deobf_hyperion, format_hyperion),
'lzmaspam': (deobf_lzma_b64, format_lzma_b64),
'vare': (deobf_vare, format_vare),
'not_pyobfuscate': (deobf_not_pyobfuscate, format_not_pyobfuscate),
'vore': (deobf_vare, format_vare),
'fct': (deobf_fct, format_fct),
}

alias_dict: dict[str, str] = {
'vore': 'vare',
'hyperd': 'hyperion',
'fct-obfuscate': 'not_pyobfuscate',
'fct_obfuscate': 'fct',
'not_pyobfuscate': 'fct',
}


class Color:
clear = '\x1b[0m'
red = '\x1b[0;31m'
green = '\x1b[0;32m'
yellow = '\x1b[0;33m'
blue = '\x1b[0;34m'
white = '\x1b[0;37m'
bold_red = '\x1b[1;31m'
bold_green = '\x1b[1;32m'
bold_yellow = '\x1b[1;33m'
bold_blue = '\x1b[1;34m'
bold_white = '\x1b[1;37m'


class NoSoftWarning(logging.Filter):
@override
def filter(self, record: logging.LogRecord) -> bool:
return not record.msg.endswith('(Expected)')


def run_deobf(file: TextIO, deobf_type: str) -> NoReturn:
deobf_type = deobf_type.replace('-', '_')
deobf_type = alias_dict.get(deobf_type, deobf_type)
if deobf_type not in supported_obfuscators:
raise InvalidSchemaError([*supported_obfuscators])
raise InvalidSchemaError

deobf_func, format_func = supported_obfuscators[deobf_type]
results = deobf_func(file)
Expand All @@ -38,19 +62,58 @@ def run():
prog='Vipyr Deobfuscator',
description='Deobfuscates obfuscated scripts',
)
parser.add_argument('-p', '--path')
parser.add_argument('-t', '--type')
parser.add_argument('-p', '--path', required=True, help='path to obfuscated file')
parser.add_argument('-t', '--type', help='type of obfuscation used')
parser.add_argument('-d', '--debug', action='store_true', help='display debug logs (defaults to false)')
parser.add_argument('-s', '--soft', action='store_true', help='display expected warnings (defaults to false)')
args = parser.parse_args()

logger = logging.getLogger('deobf')
logging_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': f'{Color.bold_yellow}[{Color.blue}%(asctime)s{Color.bold_yellow}]'
f'{Color.bold_white}:{Color.green}%(levelname)s'
f'{Color.bold_white}:{Color.red}%(message)s{Color.clear}'
}
},
'handlers': {
'stdout': {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stdout',
'filters': [] if args.soft else ['no_soft_warning']
}
},
'filters': {
'no_soft_warning': {'()': 'src.vipyr_deobf.cli.NoSoftWarning'}
},
'loggers': {
'root': {
'level': 'DEBUG' if args.debug else 'INFO',
'handlers': ['stdout']
}
}
}
logging.config.dictConfig(logging_config)

try:
with open(args.path, 'r') as file:
run_deobf(file, args.type)
except FileNotFoundError:
print(f'{args.path} is not a valid path.')
except InvalidSchemaError as exc:
print(exc)
logger.error(f'{args.path} is not a valid path.')
except InvalidSchemaError:
logger.error(
f'Unsupported obfuscation schema.\n'
f'Supported obfuscation schemes include:\n'
f'{", ".join(supported_obfuscators)}'
)
except DeobfuscationFailError as exc:
print(f'Deobfuscation of {args.path} with schema <{args.type}> failed:')
print(exc)
logger.exception(f'Deobfuscation of {args.path} with schema <{args.type}> failed:')
for var, data in exc.env_vars.items():
print(f'{Color.bold_red}{var}{Color.clear}', data, sep='\n', end='\n\n')


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ast
import base64
import binascii
import logging
import re
import zlib
from io import StringIO
Expand All @@ -9,6 +10,10 @@
from ..exceptions import DeobfuscationFailError
from ..utils import BYTES_WEBHOOK_REGEX

logger = logging.getLogger('deobf')

MAX_DEOBF_LIMIT = 1000


class ByteStringFinder(ast.NodeVisitor):
def __init__(self):
Expand All @@ -20,30 +25,25 @@ def visit_Constant(self, node):


def nab_surface_payload(surface_code: str) -> bytes:
logger.info('Nabbing surface payload')
try:
tree = ast.parse(surface_code)
# Other exceptions may appear, generalize this in the future
except SyntaxError as exc:
except SyntaxError:
logger.exception('Input text is not valid python')
raise DeobfuscationFailError(
'Input text is not valid python',
severity='critical',
status='expected',
exc=exc,
surface_code = surface_code
)
bsf = ByteStringFinder()
bsf.visit(tree)
if len(bsf.results) > 1:
logger.error('Multiple byte strings found')
raise DeobfuscationFailError(
f'Multiple byte strings found:\n{'\n'.join(map(repr, bsf.results))}',
severity='medium',
status='expected',
bsf_results = bsf.results
)
elif not bsf.results:
raise DeobfuscationFailError(
'No byte strings found in surface file',
severity='critical',
status='expected',
)
logger.error('No byte strings found in surface file')
raise DeobfuscationFailError()
return bsf.results[0]


Expand All @@ -56,14 +56,13 @@ def deobf_obf(obf_bytes: bytes) -> bytes:
try:
result = base64.b64decode(result)
except zlib.error:
logger.warning('base64 failed, trying base32 now')
result = base64.b32decode(result)
result = zlib.decompress(result)
except (binascii.Error, zlib.error) as exc:
except (binascii.Error, zlib.error):
logger.exception(f'Error in deobf_obf when trying to deobfuscate bytes')
raise DeobfuscationFailError(
f'Error in deobf_obf when trying to deobfuscate {obf_bytes}',
severity='critical',
status='expected',
exc=exc,
obf_bytes = obf_bytes
)
return result

Expand All @@ -73,35 +72,32 @@ def nab_bytes(marshalled_bytes: bytes) -> bytes:
Nabs the payload bytes from the marshalled code object
Tries the shortcut with the hardcoded payload index first, and then tries regex
"""
logger.info('Nabbing bytes from marshalled data')
try:
return index_nab_bytes(marshalled_bytes)
except DeobfuscationFailError:
pass
# TODO: in the future, raise a warning if this happens
return regex_nab_bytes(marshalled_bytes)
logger.warning('Index failed, using regex (Expected)')
return regex_nab_bytes(marshalled_bytes)


def index_nab_bytes(marshalled_bytes: bytes) -> bytes:
"""
Uses the hardcoded index of 73 to grab the payload
"""
logger.debug('Entering index_nab_bytes')
header = marshalled_bytes[73:79]
if header[:2] != b'\x02s':
raise DeobfuscationFailError(
'Bytes at index 73 is not header for bytes',
severity='low',
status='expected',
)
logger.warning('Bytes at index 73 is not header for bytes (Expected)')
raise DeobfuscationFailError()
payload_len = int.from_bytes(header[2:][::-1])
payload_start = 79
payload_end = payload_start + payload_len
payload = marshalled_bytes[payload_start:payload_end]
trailer = marshalled_bytes[payload_end:payload_end + 2]
if trailer != b'N)':
logger.error('Malformed marshal payload, length does not match trailer')
raise DeobfuscationFailError(
'Malformed marshal payload, length does not match trailer',
severity='low',
status='expected',
marshalled_bytes = marshalled_bytes
)
return payload

Expand All @@ -110,6 +106,7 @@ def regex_nab_bytes(marshalled_bytes: bytes) -> bytes:
"""
Uses regex to grab the payload
"""
logger.debug("Entering regex_nab_bytes (This shouldn't happen more than once)")
# Keep track of the current idx, so we can discard '\x02s' headers when they appear within other strings
current_idx = 0
rtn_bytes = []
Expand All @@ -121,42 +118,69 @@ def regex_nab_bytes(marshalled_bytes: bytes) -> bytes:
payload = marshalled_bytes[payload_start: payload_start + payload_len]
trailer = marshalled_bytes[payload_start + payload_len: payload_start + payload_len + 2]
if trailer != b'N)':
logger.warning('Malformed marshal payload, length does not match trailer')
raise DeobfuscationFailError(
'Malformed marshal payload, length does not match trailer',
severity='low',
status='expected',
marshalled_bytes = marshalled_bytes
)
rtn_bytes.append(payload)
current_idx = payload_start + payload_len + 2
rtn_bytes = [1, 2, 3]
if len(rtn_bytes) > 1:
logger.error('Multiple payloads found in bytes (Expected)')
raise DeobfuscationFailError(
'Multiple payloads found in bytes',
severity='low',
status='expected',
marshalled_bytes = marshalled_bytes,
rtn_bytes = rtn_bytes,
)
elif not rtn_bytes:
logger.warning('No payload found (Expected)')
raise DeobfuscationFailError(
marshalled_bytes = marshalled_bytes
)
return rtn_bytes[0]


def deobf_not_pyobfuscate(file: TextIO) -> bytes:
def deobf_fct(file: TextIO) -> bytes:
"""
Deobfuscates the not pyobfuscate schema
:return: Marshalled code object of the source code
"""
logger.info('Deobfuscating FCT format')
obf_bytes = nab_surface_payload(file.read())
while True:

for iteration in range(MAX_DEOBF_LIMIT):
logger.info(f'Deobfuscating bytes (iteration {iteration})')
marshalled_bytes = deobf_obf(obf_bytes)
if marshalled_bytes.startswith(b'exec((_)(b'):
logger.debug('Byte string is not marshalled')
obf_bytes = marshalled_bytes[11:-3]
continue
logger.debug('Byte string is marshalled')
try:
obf_bytes = nab_bytes(marshalled_bytes)
except DeobfuscationFailError:
return marshalled_bytes
if not obf_bytes:
return marshalled_bytes
logging.warning('Reached byte deobfuscation limit of 100, ending now')
raise DeobfuscationFailError(
marshalled_bytes = marshalled_bytes
)


def format_not_pyobfuscate(marshalled_bytes: bytes) -> str:
def format_fct(marshalled_bytes: bytes) -> str:
logger.info('Obfuscation complete, formatting output')
webhooks = BYTES_WEBHOOK_REGEX.findall(marshalled_bytes)

rtn_string = StringIO()
rtn_string.write(f'Marshalled bytes:\n{marshalled_bytes!r}\n')

try:
output = marshalled_bytes.decode()
except UnicodeDecodeError:
logger.info('Output appears to be marshalled')
rtn_string.write(f'Marshalled bytes:\n{marshalled_bytes!r}\n')
else:
logger.info('Output appears to be normal python')
rtn_string.write(f'Output:\n{output}')

if webhooks:
rtn_string.write('\nWebhooks found:\n')
Expand Down
Loading