Skip to content

Commit

Permalink
Refactoring of result state system and endreport
Browse files Browse the repository at this point in the history
- result state counting in single dict
- more result states
- more infos in endreport
- cite the header in exception if manipulated
  • Loading branch information
madeddy authored and CensoredUsername committed Apr 27, 2024
1 parent dcd9246 commit 31c30d0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 65 deletions.
2 changes: 2 additions & 0 deletions deobfuscate.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def read_ast(f, context):

if not raw_datas:
diagnosis.append("All strategies failed. Unable to extract data")
context.set_state('spoofed')
raise ValueError("\n".join(diagnosis))

if len(raw_datas) != 1:
Expand All @@ -311,6 +312,7 @@ def read_ast(f, context):
return stmts

diagnosis.append("All strategies failed. Unable to deobfuscate data")
context.set_state('spoofed')
raise ValueError("\n".join(diagnosis))


Expand Down
145 changes: 80 additions & 65 deletions unrpyc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def cpu_count():
class Context:
def __init__(self):
self.log_contents = []
self.error = None
self.state = None
self.value = None

def log(self, message):
self.log_contents.append(message)

def set_error(self, error):
self.error = error
def set_state(self, state):
self.state = state

def set_result(self, value):
self.value = value
Expand All @@ -76,7 +76,7 @@ def read_ast_from_file(in_file, context):
# v1 files are just a zlib compressed pickle blob containing some data and the ast
# v2 files contain a basic archive structure that can be parsed to find the same blob
raw_contents = in_file.read()

l1_start = raw_contents[:50]
is_rpyc_v1 = False

if not raw_contents.startswith(b"RENPY RPC2"):
Expand Down Expand Up @@ -110,7 +110,8 @@ def read_ast_from_file(in_file, context):
if 1 not in chunks:
raise BadRpycException(
"Unable to find the right slot to load from the rpyc file. The file header "
"structure has been changed.")
"structure has been changed."
f"File header:{l1_start}")

contents = chunks[1]

Expand All @@ -135,9 +136,9 @@ def read_ast_from_file(in_file, context):
return stmts


def decompile_rpyc(input_filename, context, overwrite=False, dump=False,
comparable=False, no_pyexpr=False, translator=None,
init_offset=False, try_harder=False, sl_custom_names=None):
def decompile_rpyc(input_filename, context, overwrite=False, try_harder=False, dump=False,
comparable=False, no_pyexpr=False, translator=None, init_offset=False,
sl_custom_names=None):

# Output filename is input filename but with .rpy extension
if dump:
Expand All @@ -151,8 +152,9 @@ def decompile_rpyc(input_filename, context, overwrite=False, dump=False,
context.log(f'Decompiling {input_filename} to {out_filename.name}...')

if not overwrite and out_filename.exists():
context.log("Output file already exists. Pass --clobber to overwrite.")
return False # Don't stop decompiling if one file already exists
context.log("Target file exists already! Skipping.")
context.set_state('skip')
return # Don't stop decompiling if a file already exists

with input_filename.open('rb') as in_file:
if try_harder:
Expand All @@ -168,7 +170,8 @@ def decompile_rpyc(input_filename, context, overwrite=False, dump=False,
init_offset=init_offset, sl_custom_names=sl_custom_names)

decompiler.pprint(out_file, ast, options)
return True

context.set_state('ok')

def extract_translations(input_filename, language, context):
context.log(f'Extracting translations from {input_filename}...')
Expand All @@ -182,6 +185,40 @@ def extract_translations(input_filename, language, context):
return pickle_safe_dumps(translator.dialogue), translator.strings


def worker(arg_tup):
args, filename = arg_tup
context = Context()

try:
if args.write_translation_file:
result = extract_translations(filename, args.language, context)
else:
if args.translation_file is not None:
translator = translate.Translator(None)
translator.language, translator.dialogue, translator.strings = (
pickle_loads(args.translations))
else:
translator = None
result = decompile_rpyc(
filename, context, args.clobber, try_harder=args.try_harder, dump=args.dump,
no_pyexpr=args.no_pyexpr, comparable=args.comparable, translator=translator,
init_offset=args.init_offset, sl_custom_names=args.sl_custom_names
)

context.set_result(result)

except BadRpycException:
context.set_state('spoofed')
context.log(f'Error while trying to read the header of {filename}:')
context.log(traceback.format_exc())
except Exception:
context.set_state('fail')
context.log(f'Error while decompiling {filename}:')
context.log(traceback.format_exc())

return context


def parse_sl_custom_names(unparsed_arguments):
# parse a list of strings in the format
# classname=name-nchildren into {classname: (name, nchildren)}
Expand Down Expand Up @@ -216,35 +253,6 @@ def parse_sl_custom_names(unparsed_arguments):

return parsed_arguments

def worker(arg_tup):
args, filename = arg_tup
context = Context()

try:
if args.write_translation_file:
result = extract_translations(filename, args.language, context)
else:
if args.translation_file is not None:
translator = translate.Translator(None)
translator.language, translator.dialogue, translator.strings = (
pickle_loads(args.translations))
else:
translator = None
result = decompile_rpyc(filename, context, args.clobber, args.dump,
no_pyexpr=args.no_pyexpr, comparable=args.comparable,
translator=translator, init_offset=args.init_offset,
try_harder=args.try_harder,
sl_custom_names=args.sl_custom_names)

context.set_result(result)

except Exception as e:
context.set_error(e)
context.log(f'Error while decompiling {filename}:')
context.log(traceback.format_exc())

return context


def main():
if not sys.version_info[:2] >= (3, 9):
Expand Down Expand Up @@ -363,6 +371,8 @@ def main():
version=f"{__title__} {__version__}")

args = ap.parse_args()
# Basic start state
state_count = dict({'total': 0, 'ok': 0, 'fail': 0, 'skip': 0, 'spoofed': 0})

# Catch impossible arg combinations so they don't produce strange errors or fail silently
if (args.no_pyexpr or args.comparable) and not args.dump:
Expand Down Expand Up @@ -424,58 +434,63 @@ def traverse(inpath):
if not worklist:
print("Found no script files to decompile.")
return
state_count['total'] = len(worklist)

# If a big file starts near the end, there could be a long time with only one thread running,
# which is inefficient. Avoid this by starting big files first.
worklist.sort(key=lambda x: x.stat().st_size, reverse=True)
worklist = [(args, x) for x in worklist]

results = []

if args.processes > 1 and len(worklist) > 5:
with Pool(args.processes) as pool:
for result in pool.imap(worker, worklist, 1):
results.append(result)

for line in result.log_contents:
print(line)

else:
for result in map(worker, worklist):
results.append(result)

for line in result.log_contents:
print(line)

if args.write_translation_file:
print(f'Writing translations to {args.write_translation_file}...')
translated_dialogue = {}
translated_strings = {}
good = 0
bad = 0
for result in results:
if not result.value:
bad += 1
continue
good += 1
translated_dialogue.update(pickle_loads(result.value[0]))
translated_strings.update(result.value[1])
with args.write_translation_file.open('wb') as out_file:
pickle_safe_dump((args.language, translated_dialogue, translated_strings), out_file)

else:
# Check per file if everything went well and report back
good = sum(1 for i in results if i.error is None)
bad = sum(1 for i in results if i.error is not None)

if bad == 0:
print(f'Decompilation of {good} script file{"s" if good > 1 else ""} successful')
elif good == 0:
print(f'Decompilation of {bad} file{"s" if bad > 1 else ""} failed')
else:
print(f'Decompilation of {good} file{"s" if good > 1 else ""} successful\n'
f'but decompilation of {bad} file{"s" if bad > 1 else ""} failed')

# Get infos per instance and write them to their targets
for res_inst in results:
state_count[res_inst.state] += 1

for log_entry in res_inst.log_contents:
print(log_entry)

def plural_fmt(inp):
"""returns singular or plural of term file(s) contingent of input count"""
return f"{inp} file{'s'[:inp^1]}"

endreport = (
"\nThis decompile run of Unrpyc has the following outcome:\n"
f"{55 * '-'}\n"
f" A total of {plural_fmt(state_count['total'])} to decompile where found.\n"
f" > {plural_fmt(state_count['ok'])} could be successful decompiled.\n"
f" > {plural_fmt(state_count['fail'])} failed due to diverse errors.\n"
f" > {plural_fmt(state_count['spoofed'])} with wrong header or other manipulation.\n"
f" > {plural_fmt(state_count['skip'])} already exist and have been skipped.\n"
)
# add pointers if we encounter problems
skipped = ("To overwrite existing files use option '--clobber'. "
if state_count['skip'] != 0 else "")
spoofed = (
"In case of manipulation, the --try-harder option can be attempted."
if state_count['spoofed'] != 0 else "")
errors = ("Errors were found. Check the exceptions in the log for more info about this."
if state_count['fail'] != 0 else "")
print(endreport, skipped, spoofed, errors)

if __name__ == '__main__':
main()

0 comments on commit 31c30d0

Please sign in to comment.