From d9cb91f59f87f744b4cc2eb856cc9749781b2ecb Mon Sep 17 00:00:00 2001 From: Stickie <71910589+FieryIceStickie@users.noreply.github.com> Date: Mon, 4 Mar 2024 01:59:28 +1100 Subject: [PATCH] Upgrade FCT Deobfuscator (#31) * - Rename not_pyobfuscate to fct * - Add logging.py * Delete logging.py Signed-off-by: stickie * Adds logger to cli.py Signed-off-by: stickie * Add logging to fct.py Signed-off-by: stickie <71910589+FieryIceStickie@users.noreply.github.com> * Add logging to fct.py Signed-off-by: stickie <71910589+FieryIceStickie@users.noreply.github.com> * Upgrade FCT deobfuscator and improve CLI - Added --debug and --soft flags to argparse to modify logging config - Added logging - Added logging to FCT - Added FCT deobfuscation for when byte strings aren't marshalled Signed-off-by: stickie <71910589+FieryIceStickie@users.noreply.github.com> * Ran isort Signed-off-by: stickie <71910589+FieryIceStickie@users.noreply.github.com> --------- Signed-off-by: stickie Signed-off-by: stickie <71910589+FieryIceStickie@users.noreply.github.com> --- src/vipyr_deobf/cli.py | 95 +++++++++++++--- .../{not_pyobfuscate.py => fct.py} | 102 +++++++++++------- src/vipyr_deobf/exceptions.py | 46 ++------ 3 files changed, 149 insertions(+), 94 deletions(-) rename src/vipyr_deobf/deobfuscators/{not_pyobfuscate.py => fct.py} (58%) diff --git a/src/vipyr_deobf/cli.py b/src/vipyr_deobf/cli.py index 112f8fb..ad5569a 100644 --- a/src/vipyr_deobf/cli.py +++ b/src/vipyr_deobf/cli.py @@ -1,10 +1,12 @@ import argparse -from typing import Callable, NoReturn, TextIO, TypeVar +import logging +import logging.config +from typing import Callable, NoReturn, TextIO, TypeVar, override -from .deobfuscators.hyperion import format_hyperion, deobf_hyperion -from .deobfuscators.lzmaspam import format_lzma_b64, deobf_lzma_b64 -from .deobfuscators.vare import format_vare, deobf_vare -from .deobfuscators.not_pyobfuscate import format_not_pyobfuscate, deobf_not_pyobfuscate +from .deobfuscators.fct import deobf_fct, format_fct +from .deobfuscators.hyperion import deobf_hyperion, format_hyperion +from .deobfuscators.lzmaspam import deobf_lzma_b64, format_lzma_b64 +from .deobfuscators.vare import deobf_vare, format_vare from .exceptions import DeobfuscationFailError, InvalidSchemaError R = TypeVar('R') @@ -12,21 +14,43 @@ supported_obfuscators: dict[str, tuple[Callable[[TextIO], R], Callable[[R], str]]] = { 'hyperion': (deobf_hyperion, format_hyperion), 'lzmaspam': (deobf_lzma_b64, format_lzma_b64), - 'vare': (deobf_vare, format_vare), - 'not_pyobfuscate': (deobf_not_pyobfuscate, format_not_pyobfuscate), + 'vore': (deobf_vare, format_vare), + 'fct': (deobf_fct, format_fct), } alias_dict: dict[str, str] = { 'vore': 'vare', 'hyperd': 'hyperion', - 'fct-obfuscate': 'not_pyobfuscate', + 'fct_obfuscate': 'fct', + 'not_pyobfuscate': 'fct', } +class Color: + clear = '\x1b[0m' + red = '\x1b[0;31m' + green = '\x1b[0;32m' + yellow = '\x1b[0;33m' + blue = '\x1b[0;34m' + white = '\x1b[0;37m' + bold_red = '\x1b[1;31m' + bold_green = '\x1b[1;32m' + bold_yellow = '\x1b[1;33m' + bold_blue = '\x1b[1;34m' + bold_white = '\x1b[1;37m' + + +class NoSoftWarning(logging.Filter): + @override + def filter(self, record: logging.LogRecord) -> bool: + return not record.msg.endswith('(Expected)') + + def run_deobf(file: TextIO, deobf_type: str) -> NoReturn: + deobf_type = deobf_type.replace('-', '_') deobf_type = alias_dict.get(deobf_type, deobf_type) if deobf_type not in supported_obfuscators: - raise InvalidSchemaError([*supported_obfuscators]) + raise InvalidSchemaError deobf_func, format_func = supported_obfuscators[deobf_type] results = deobf_func(file) @@ -38,19 +62,58 @@ def run(): prog='Vipyr Deobfuscator', description='Deobfuscates obfuscated scripts', ) - parser.add_argument('-p', '--path') - parser.add_argument('-t', '--type') + parser.add_argument('-p', '--path', required=True, help='path to obfuscated file') + parser.add_argument('-t', '--type', help='type of obfuscation used') + parser.add_argument('-d', '--debug', action='store_true', help='display debug logs (defaults to false)') + parser.add_argument('-s', '--soft', action='store_true', help='display expected warnings (defaults to false)') args = parser.parse_args() + + logger = logging.getLogger('deobf') + logging_config = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'default': { + 'format': f'{Color.bold_yellow}[{Color.blue}%(asctime)s{Color.bold_yellow}]' + f'{Color.bold_white}:{Color.green}%(levelname)s' + f'{Color.bold_white}:{Color.red}%(message)s{Color.clear}' + } + }, + 'handlers': { + 'stdout': { + 'class': 'logging.StreamHandler', + 'formatter': 'default', + 'stream': 'ext://sys.stdout', + 'filters': [] if args.soft else ['no_soft_warning'] + } + }, + 'filters': { + 'no_soft_warning': {'()': 'src.vipyr_deobf.cli.NoSoftWarning'} + }, + 'loggers': { + 'root': { + 'level': 'DEBUG' if args.debug else 'INFO', + 'handlers': ['stdout'] + } + } + } + logging.config.dictConfig(logging_config) + try: with open(args.path, 'r') as file: run_deobf(file, args.type) except FileNotFoundError: - print(f'{args.path} is not a valid path.') - except InvalidSchemaError as exc: - print(exc) + logger.error(f'{args.path} is not a valid path.') + except InvalidSchemaError: + logger.error( + f'Unsupported obfuscation schema.\n' + f'Supported obfuscation schemes include:\n' + f'{", ".join(supported_obfuscators)}' + ) except DeobfuscationFailError as exc: - print(f'Deobfuscation of {args.path} with schema <{args.type}> failed:') - print(exc) + logger.exception(f'Deobfuscation of {args.path} with schema <{args.type}> failed:') + for var, data in exc.env_vars.items(): + print(f'{Color.bold_red}{var}{Color.clear}', data, sep='\n', end='\n\n') if __name__ == '__main__': diff --git a/src/vipyr_deobf/deobfuscators/not_pyobfuscate.py b/src/vipyr_deobf/deobfuscators/fct.py similarity index 58% rename from src/vipyr_deobf/deobfuscators/not_pyobfuscate.py rename to src/vipyr_deobf/deobfuscators/fct.py index 116fc72..93d0260 100644 --- a/src/vipyr_deobf/deobfuscators/not_pyobfuscate.py +++ b/src/vipyr_deobf/deobfuscators/fct.py @@ -1,6 +1,7 @@ import ast import base64 import binascii +import logging import re import zlib from io import StringIO @@ -9,6 +10,10 @@ from ..exceptions import DeobfuscationFailError from ..utils import BYTES_WEBHOOK_REGEX +logger = logging.getLogger('deobf') + +MAX_DEOBF_LIMIT = 1000 + class ByteStringFinder(ast.NodeVisitor): def __init__(self): @@ -20,30 +25,25 @@ def visit_Constant(self, node): def nab_surface_payload(surface_code: str) -> bytes: + logger.info('Nabbing surface payload') try: tree = ast.parse(surface_code) # Other exceptions may appear, generalize this in the future - except SyntaxError as exc: + except SyntaxError: + logger.exception('Input text is not valid python') raise DeobfuscationFailError( - 'Input text is not valid python', - severity='critical', - status='expected', - exc=exc, + surface_code = surface_code ) bsf = ByteStringFinder() bsf.visit(tree) if len(bsf.results) > 1: + logger.error('Multiple byte strings found') raise DeobfuscationFailError( - f'Multiple byte strings found:\n{'\n'.join(map(repr, bsf.results))}', - severity='medium', - status='expected', + bsf_results = bsf.results ) elif not bsf.results: - raise DeobfuscationFailError( - 'No byte strings found in surface file', - severity='critical', - status='expected', - ) + logger.error('No byte strings found in surface file') + raise DeobfuscationFailError() return bsf.results[0] @@ -56,14 +56,13 @@ def deobf_obf(obf_bytes: bytes) -> bytes: try: result = base64.b64decode(result) except zlib.error: + logger.warning('base64 failed, trying base32 now') result = base64.b32decode(result) result = zlib.decompress(result) - except (binascii.Error, zlib.error) as exc: + except (binascii.Error, zlib.error): + logger.exception(f'Error in deobf_obf when trying to deobfuscate bytes') raise DeobfuscationFailError( - f'Error in deobf_obf when trying to deobfuscate {obf_bytes}', - severity='critical', - status='expected', - exc=exc, + obf_bytes = obf_bytes ) return result @@ -73,35 +72,32 @@ def nab_bytes(marshalled_bytes: bytes) -> bytes: Nabs the payload bytes from the marshalled code object Tries the shortcut with the hardcoded payload index first, and then tries regex """ + logger.info('Nabbing bytes from marshalled data') try: return index_nab_bytes(marshalled_bytes) except DeobfuscationFailError: - pass - # TODO: in the future, raise a warning if this happens - return regex_nab_bytes(marshalled_bytes) + logger.warning('Index failed, using regex (Expected)') + return regex_nab_bytes(marshalled_bytes) def index_nab_bytes(marshalled_bytes: bytes) -> bytes: """ Uses the hardcoded index of 73 to grab the payload """ + logger.debug('Entering index_nab_bytes') header = marshalled_bytes[73:79] if header[:2] != b'\x02s': - raise DeobfuscationFailError( - 'Bytes at index 73 is not header for bytes', - severity='low', - status='expected', - ) + logger.warning('Bytes at index 73 is not header for bytes (Expected)') + raise DeobfuscationFailError() payload_len = int.from_bytes(header[2:][::-1]) payload_start = 79 payload_end = payload_start + payload_len payload = marshalled_bytes[payload_start:payload_end] trailer = marshalled_bytes[payload_end:payload_end + 2] if trailer != b'N)': + logger.error('Malformed marshal payload, length does not match trailer') raise DeobfuscationFailError( - 'Malformed marshal payload, length does not match trailer', - severity='low', - status='expected', + marshalled_bytes = marshalled_bytes ) return payload @@ -110,6 +106,7 @@ def regex_nab_bytes(marshalled_bytes: bytes) -> bytes: """ Uses regex to grab the payload """ + logger.debug("Entering regex_nab_bytes (This shouldn't happen more than once)") # Keep track of the current idx, so we can discard '\x02s' headers when they appear within other strings current_idx = 0 rtn_bytes = [] @@ -121,42 +118,69 @@ def regex_nab_bytes(marshalled_bytes: bytes) -> bytes: payload = marshalled_bytes[payload_start: payload_start + payload_len] trailer = marshalled_bytes[payload_start + payload_len: payload_start + payload_len + 2] if trailer != b'N)': + logger.warning('Malformed marshal payload, length does not match trailer') raise DeobfuscationFailError( - 'Malformed marshal payload, length does not match trailer', - severity='low', - status='expected', + marshalled_bytes = marshalled_bytes ) rtn_bytes.append(payload) current_idx = payload_start + payload_len + 2 + rtn_bytes = [1, 2, 3] if len(rtn_bytes) > 1: + logger.error('Multiple payloads found in bytes (Expected)') raise DeobfuscationFailError( - 'Multiple payloads found in bytes', - severity='low', - status='expected', + marshalled_bytes = marshalled_bytes, + rtn_bytes = rtn_bytes, + ) + elif not rtn_bytes: + logger.warning('No payload found (Expected)') + raise DeobfuscationFailError( + marshalled_bytes = marshalled_bytes ) return rtn_bytes[0] -def deobf_not_pyobfuscate(file: TextIO) -> bytes: +def deobf_fct(file: TextIO) -> bytes: """ Deobfuscates the not pyobfuscate schema :return: Marshalled code object of the source code """ + logger.info('Deobfuscating FCT format') obf_bytes = nab_surface_payload(file.read()) - while True: + + for iteration in range(MAX_DEOBF_LIMIT): + logger.info(f'Deobfuscating bytes (iteration {iteration})') marshalled_bytes = deobf_obf(obf_bytes) + if marshalled_bytes.startswith(b'exec((_)(b'): + logger.debug('Byte string is not marshalled') + obf_bytes = marshalled_bytes[11:-3] + continue + logger.debug('Byte string is marshalled') try: obf_bytes = nab_bytes(marshalled_bytes) except DeobfuscationFailError: return marshalled_bytes if not obf_bytes: return marshalled_bytes + logging.warning('Reached byte deobfuscation limit of 100, ending now') + raise DeobfuscationFailError( + marshalled_bytes = marshalled_bytes + ) -def format_not_pyobfuscate(marshalled_bytes: bytes) -> str: +def format_fct(marshalled_bytes: bytes) -> str: + logger.info('Obfuscation complete, formatting output') webhooks = BYTES_WEBHOOK_REGEX.findall(marshalled_bytes) + rtn_string = StringIO() - rtn_string.write(f'Marshalled bytes:\n{marshalled_bytes!r}\n') + + try: + output = marshalled_bytes.decode() + except UnicodeDecodeError: + logger.info('Output appears to be marshalled') + rtn_string.write(f'Marshalled bytes:\n{marshalled_bytes!r}\n') + else: + logger.info('Output appears to be normal python') + rtn_string.write(f'Output:\n{output}') if webhooks: rtn_string.write('\nWebhooks found:\n') diff --git a/src/vipyr_deobf/exceptions.py b/src/vipyr_deobf/exceptions.py index 9af5e1b..3f44aa1 100644 --- a/src/vipyr_deobf/exceptions.py +++ b/src/vipyr_deobf/exceptions.py @@ -1,5 +1,5 @@ -import traceback -from typing import Literal +import inspect +from typing import Any class Error(Exception): @@ -7,49 +7,17 @@ class Error(Exception): class InvalidSchemaError(Error): - def __init__(self, supported: list[str]): - self.supported = supported - - def __str__(self): - return (f'Unsupported obfuscation schema.\n' - f'Supported obfuscation schemes include:\n' - f'{", ".join(self.supported)}') + pass class DeobfuscationFailError(Error): """ A generic exception for when deobfuscation fails """ - def __init__( - self, - msg: str, - severity: str, - status: Literal['expected', 'unexpected'], - exc: Exception | None = None - ): + def __init__(self, **env_vars): """ - Deobfuscation Failure Innit - :param msg: A string with details on the cause of the exception - :param severity: Severity of the exception. Ranges from Low to Critical: - - Critical: An error occurred and the program cannot proceed. - - High: An error occurred and the program cannot proceed, but the user can provide input to fix the issue. - - Mid: A check that should pass has gone off, and program can proceed. Prompt user for confirmation. - - Low: A check that sometimes passes has gone off, and program can proceed. Warn the user. - :param status: 'expected' if the program raises this at some point, otherwise 'unexpected' - :param exc: An optional exception object containing the exception that was raised + Deobfuscation Failure + :param env_vars: Relevant environment variables for debugging """ - # Note: Prompting the user is fiddly and has not been implemented yet, - # so currently all of these severities are the same functionally - self.msg = msg - self.severity = severity - self.status = status - self.exc = exc + self.env_vars = env_vars super().__init__() - - def __str__(self): - # TODO: make better, maybe Enum - return f""" -\033[0;32mSeverity:\033[0m {self.severity} -\033[0;32mStatus:\033[0m {self.status} -\033[0;32mMessage:\033[0m {self.msg} -"""