Skip to content

Commit

Permalink
Upgrade FCT Deobfuscator (#31)
Browse files Browse the repository at this point in the history
* - Rename not_pyobfuscate to fct

* - Add logging.py

* Delete logging.py

Signed-off-by: stickie <[email protected]>

* Adds logger to cli.py

Signed-off-by: stickie <[email protected]>

* Add logging to fct.py

Signed-off-by: stickie <[email protected]>

* Add logging to fct.py

Signed-off-by: stickie <[email protected]>

* Upgrade FCT deobfuscator and improve CLI
- Added --debug and --soft flags to argparse to modify logging config
- Added logging
- Added logging to FCT
- Added FCT deobfuscation for when byte strings aren't marshalled

Signed-off-by: stickie <[email protected]>

* Ran isort

Signed-off-by: stickie <[email protected]>

---------

Signed-off-by: stickie <[email protected]>
Signed-off-by: stickie <[email protected]>
  • Loading branch information
FieryIceStickie committed Mar 3, 2024
1 parent 0642990 commit d9cb91f
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 94 deletions.
95 changes: 79 additions & 16 deletions src/vipyr_deobf/cli.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,56 @@
import argparse
from typing import Callable, NoReturn, TextIO, TypeVar
import logging
import logging.config
from typing import Callable, NoReturn, TextIO, TypeVar, override

from .deobfuscators.hyperion import format_hyperion, deobf_hyperion
from .deobfuscators.lzmaspam import format_lzma_b64, deobf_lzma_b64
from .deobfuscators.vare import format_vare, deobf_vare
from .deobfuscators.not_pyobfuscate import format_not_pyobfuscate, deobf_not_pyobfuscate
from .deobfuscators.fct import deobf_fct, format_fct
from .deobfuscators.hyperion import deobf_hyperion, format_hyperion
from .deobfuscators.lzmaspam import deobf_lzma_b64, format_lzma_b64
from .deobfuscators.vare import deobf_vare, format_vare
from .exceptions import DeobfuscationFailError, InvalidSchemaError

R = TypeVar('R')

supported_obfuscators: dict[str, tuple[Callable[[TextIO], R], Callable[[R], str]]] = {
'hyperion': (deobf_hyperion, format_hyperion),
'lzmaspam': (deobf_lzma_b64, format_lzma_b64),
'vare': (deobf_vare, format_vare),
'not_pyobfuscate': (deobf_not_pyobfuscate, format_not_pyobfuscate),
'vore': (deobf_vare, format_vare),
'fct': (deobf_fct, format_fct),
}

alias_dict: dict[str, str] = {
'vore': 'vare',
'hyperd': 'hyperion',
'fct-obfuscate': 'not_pyobfuscate',
'fct_obfuscate': 'fct',
'not_pyobfuscate': 'fct',
}


class Color:
clear = '\x1b[0m'
red = '\x1b[0;31m'
green = '\x1b[0;32m'
yellow = '\x1b[0;33m'
blue = '\x1b[0;34m'
white = '\x1b[0;37m'
bold_red = '\x1b[1;31m'
bold_green = '\x1b[1;32m'
bold_yellow = '\x1b[1;33m'
bold_blue = '\x1b[1;34m'
bold_white = '\x1b[1;37m'


class NoSoftWarning(logging.Filter):
@override
def filter(self, record: logging.LogRecord) -> bool:
return not record.msg.endswith('(Expected)')


def run_deobf(file: TextIO, deobf_type: str) -> NoReturn:
deobf_type = deobf_type.replace('-', '_')
deobf_type = alias_dict.get(deobf_type, deobf_type)
if deobf_type not in supported_obfuscators:
raise InvalidSchemaError([*supported_obfuscators])
raise InvalidSchemaError

deobf_func, format_func = supported_obfuscators[deobf_type]
results = deobf_func(file)
Expand All @@ -38,19 +62,58 @@ def run():
prog='Vipyr Deobfuscator',
description='Deobfuscates obfuscated scripts',
)
parser.add_argument('-p', '--path')
parser.add_argument('-t', '--type')
parser.add_argument('-p', '--path', required=True, help='path to obfuscated file')
parser.add_argument('-t', '--type', help='type of obfuscation used')
parser.add_argument('-d', '--debug', action='store_true', help='display debug logs (defaults to false)')
parser.add_argument('-s', '--soft', action='store_true', help='display expected warnings (defaults to false)')
args = parser.parse_args()

logger = logging.getLogger('deobf')
logging_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': f'{Color.bold_yellow}[{Color.blue}%(asctime)s{Color.bold_yellow}]'
f'{Color.bold_white}:{Color.green}%(levelname)s'
f'{Color.bold_white}:{Color.red}%(message)s{Color.clear}'
}
},
'handlers': {
'stdout': {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stdout',
'filters': [] if args.soft else ['no_soft_warning']
}
},
'filters': {
'no_soft_warning': {'()': 'src.vipyr_deobf.cli.NoSoftWarning'}
},
'loggers': {
'root': {
'level': 'DEBUG' if args.debug else 'INFO',
'handlers': ['stdout']
}
}
}
logging.config.dictConfig(logging_config)

try:
with open(args.path, 'r') as file:
run_deobf(file, args.type)
except FileNotFoundError:
print(f'{args.path} is not a valid path.')
except InvalidSchemaError as exc:
print(exc)
logger.error(f'{args.path} is not a valid path.')
except InvalidSchemaError:
logger.error(
f'Unsupported obfuscation schema.\n'
f'Supported obfuscation schemes include:\n'
f'{", ".join(supported_obfuscators)}'
)
except DeobfuscationFailError as exc:
print(f'Deobfuscation of {args.path} with schema <{args.type}> failed:')
print(exc)
logger.exception(f'Deobfuscation of {args.path} with schema <{args.type}> failed:')
for var, data in exc.env_vars.items():
print(f'{Color.bold_red}{var}{Color.clear}', data, sep='\n', end='\n\n')


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ast
import base64
import binascii
import logging
import re
import zlib
from io import StringIO
Expand All @@ -9,6 +10,10 @@
from ..exceptions import DeobfuscationFailError
from ..utils import BYTES_WEBHOOK_REGEX

logger = logging.getLogger('deobf')

MAX_DEOBF_LIMIT = 1000


class ByteStringFinder(ast.NodeVisitor):
def __init__(self):
Expand All @@ -20,30 +25,25 @@ def visit_Constant(self, node):


def nab_surface_payload(surface_code: str) -> bytes:
logger.info('Nabbing surface payload')
try:
tree = ast.parse(surface_code)
# Other exceptions may appear, generalize this in the future
except SyntaxError as exc:
except SyntaxError:
logger.exception('Input text is not valid python')
raise DeobfuscationFailError(
'Input text is not valid python',
severity='critical',
status='expected',
exc=exc,
surface_code = surface_code
)
bsf = ByteStringFinder()
bsf.visit(tree)
if len(bsf.results) > 1:
logger.error('Multiple byte strings found')
raise DeobfuscationFailError(
f'Multiple byte strings found:\n{'\n'.join(map(repr, bsf.results))}',
severity='medium',
status='expected',
bsf_results = bsf.results
)
elif not bsf.results:
raise DeobfuscationFailError(
'No byte strings found in surface file',
severity='critical',
status='expected',
)
logger.error('No byte strings found in surface file')
raise DeobfuscationFailError()
return bsf.results[0]


Expand All @@ -56,14 +56,13 @@ def deobf_obf(obf_bytes: bytes) -> bytes:
try:
result = base64.b64decode(result)
except zlib.error:
logger.warning('base64 failed, trying base32 now')
result = base64.b32decode(result)
result = zlib.decompress(result)
except (binascii.Error, zlib.error) as exc:
except (binascii.Error, zlib.error):
logger.exception(f'Error in deobf_obf when trying to deobfuscate bytes')
raise DeobfuscationFailError(
f'Error in deobf_obf when trying to deobfuscate {obf_bytes}',
severity='critical',
status='expected',
exc=exc,
obf_bytes = obf_bytes
)
return result

Expand All @@ -73,35 +72,32 @@ def nab_bytes(marshalled_bytes: bytes) -> bytes:
Nabs the payload bytes from the marshalled code object
Tries the shortcut with the hardcoded payload index first, and then tries regex
"""
logger.info('Nabbing bytes from marshalled data')
try:
return index_nab_bytes(marshalled_bytes)
except DeobfuscationFailError:
pass
# TODO: in the future, raise a warning if this happens
return regex_nab_bytes(marshalled_bytes)
logger.warning('Index failed, using regex (Expected)')
return regex_nab_bytes(marshalled_bytes)


def index_nab_bytes(marshalled_bytes: bytes) -> bytes:
"""
Uses the hardcoded index of 73 to grab the payload
"""
logger.debug('Entering index_nab_bytes')
header = marshalled_bytes[73:79]
if header[:2] != b'\x02s':
raise DeobfuscationFailError(
'Bytes at index 73 is not header for bytes',
severity='low',
status='expected',
)
logger.warning('Bytes at index 73 is not header for bytes (Expected)')
raise DeobfuscationFailError()
payload_len = int.from_bytes(header[2:][::-1])
payload_start = 79
payload_end = payload_start + payload_len
payload = marshalled_bytes[payload_start:payload_end]
trailer = marshalled_bytes[payload_end:payload_end + 2]
if trailer != b'N)':
logger.error('Malformed marshal payload, length does not match trailer')
raise DeobfuscationFailError(
'Malformed marshal payload, length does not match trailer',
severity='low',
status='expected',
marshalled_bytes = marshalled_bytes
)
return payload

Expand All @@ -110,6 +106,7 @@ def regex_nab_bytes(marshalled_bytes: bytes) -> bytes:
"""
Uses regex to grab the payload
"""
logger.debug("Entering regex_nab_bytes (This shouldn't happen more than once)")
# Keep track of the current idx, so we can discard '\x02s' headers when they appear within other strings
current_idx = 0
rtn_bytes = []
Expand All @@ -121,42 +118,69 @@ def regex_nab_bytes(marshalled_bytes: bytes) -> bytes:
payload = marshalled_bytes[payload_start: payload_start + payload_len]
trailer = marshalled_bytes[payload_start + payload_len: payload_start + payload_len + 2]
if trailer != b'N)':
logger.warning('Malformed marshal payload, length does not match trailer')
raise DeobfuscationFailError(
'Malformed marshal payload, length does not match trailer',
severity='low',
status='expected',
marshalled_bytes = marshalled_bytes
)
rtn_bytes.append(payload)
current_idx = payload_start + payload_len + 2
rtn_bytes = [1, 2, 3]
if len(rtn_bytes) > 1:
logger.error('Multiple payloads found in bytes (Expected)')
raise DeobfuscationFailError(
'Multiple payloads found in bytes',
severity='low',
status='expected',
marshalled_bytes = marshalled_bytes,
rtn_bytes = rtn_bytes,
)
elif not rtn_bytes:
logger.warning('No payload found (Expected)')
raise DeobfuscationFailError(
marshalled_bytes = marshalled_bytes
)
return rtn_bytes[0]


def deobf_not_pyobfuscate(file: TextIO) -> bytes:
def deobf_fct(file: TextIO) -> bytes:
"""
Deobfuscates the not pyobfuscate schema
:return: Marshalled code object of the source code
"""
logger.info('Deobfuscating FCT format')
obf_bytes = nab_surface_payload(file.read())
while True:

for iteration in range(MAX_DEOBF_LIMIT):
logger.info(f'Deobfuscating bytes (iteration {iteration})')
marshalled_bytes = deobf_obf(obf_bytes)
if marshalled_bytes.startswith(b'exec((_)(b'):
logger.debug('Byte string is not marshalled')
obf_bytes = marshalled_bytes[11:-3]
continue
logger.debug('Byte string is marshalled')
try:
obf_bytes = nab_bytes(marshalled_bytes)
except DeobfuscationFailError:
return marshalled_bytes
if not obf_bytes:
return marshalled_bytes
logging.warning('Reached byte deobfuscation limit of 100, ending now')
raise DeobfuscationFailError(
marshalled_bytes = marshalled_bytes
)


def format_not_pyobfuscate(marshalled_bytes: bytes) -> str:
def format_fct(marshalled_bytes: bytes) -> str:
logger.info('Obfuscation complete, formatting output')
webhooks = BYTES_WEBHOOK_REGEX.findall(marshalled_bytes)

rtn_string = StringIO()
rtn_string.write(f'Marshalled bytes:\n{marshalled_bytes!r}\n')

try:
output = marshalled_bytes.decode()
except UnicodeDecodeError:
logger.info('Output appears to be marshalled')
rtn_string.write(f'Marshalled bytes:\n{marshalled_bytes!r}\n')
else:
logger.info('Output appears to be normal python')
rtn_string.write(f'Output:\n{output}')

if webhooks:
rtn_string.write('\nWebhooks found:\n')
Expand Down
Loading

0 comments on commit d9cb91f

Please sign in to comment.