From 31c30d00807fd106a48dc187d0e008390bd946b4 Mon Sep 17 00:00:00 2001 From: madeddy Date: Sat, 20 Apr 2024 22:03:13 +0200 Subject: [PATCH] Refactoring of result state system and endreport - result state counting in single dict - more result states - more infos in endreport - cite the header in exception if manipulated --- deobfuscate.py | 2 + unrpyc.py | 145 +++++++++++++++++++++++++++---------------------- 2 files changed, 82 insertions(+), 65 deletions(-) diff --git a/deobfuscate.py b/deobfuscate.py index 2fff825..5b1347c 100644 --- a/deobfuscate.py +++ b/deobfuscate.py @@ -294,6 +294,7 @@ def read_ast(f, context): if not raw_datas: diagnosis.append("All strategies failed. Unable to extract data") + context.set_state('spoofed') raise ValueError("\n".join(diagnosis)) if len(raw_datas) != 1: @@ -311,6 +312,7 @@ def read_ast(f, context): return stmts diagnosis.append("All strategies failed. Unable to deobfuscate data") + context.set_state('spoofed') raise ValueError("\n".join(diagnosis)) diff --git a/unrpyc.py b/unrpyc.py index 211b8d2..93e7d16 100755 --- a/unrpyc.py +++ b/unrpyc.py @@ -51,14 +51,14 @@ def cpu_count(): class Context: def __init__(self): self.log_contents = [] - self.error = None + self.state = None self.value = None def log(self, message): self.log_contents.append(message) - def set_error(self, error): - self.error = error + def set_state(self, state): + self.state = state def set_result(self, value): self.value = value @@ -76,7 +76,7 @@ def read_ast_from_file(in_file, context): # v1 files are just a zlib compressed pickle blob containing some data and the ast # v2 files contain a basic archive structure that can be parsed to find the same blob raw_contents = in_file.read() - + l1_start = raw_contents[:50] is_rpyc_v1 = False if not raw_contents.startswith(b"RENPY RPC2"): @@ -110,7 +110,8 @@ def read_ast_from_file(in_file, context): if 1 not in chunks: raise BadRpycException( "Unable to find the right slot to load from the rpyc file. The file header " - "structure has been changed.") + "structure has been changed." + f"File header:{l1_start}") contents = chunks[1] @@ -135,9 +136,9 @@ def read_ast_from_file(in_file, context): return stmts -def decompile_rpyc(input_filename, context, overwrite=False, dump=False, - comparable=False, no_pyexpr=False, translator=None, - init_offset=False, try_harder=False, sl_custom_names=None): +def decompile_rpyc(input_filename, context, overwrite=False, try_harder=False, dump=False, + comparable=False, no_pyexpr=False, translator=None, init_offset=False, + sl_custom_names=None): # Output filename is input filename but with .rpy extension if dump: @@ -151,8 +152,9 @@ def decompile_rpyc(input_filename, context, overwrite=False, dump=False, context.log(f'Decompiling {input_filename} to {out_filename.name}...') if not overwrite and out_filename.exists(): - context.log("Output file already exists. Pass --clobber to overwrite.") - return False # Don't stop decompiling if one file already exists + context.log("Target file exists already! Skipping.") + context.set_state('skip') + return # Don't stop decompiling if a file already exists with input_filename.open('rb') as in_file: if try_harder: @@ -168,7 +170,8 @@ def decompile_rpyc(input_filename, context, overwrite=False, dump=False, init_offset=init_offset, sl_custom_names=sl_custom_names) decompiler.pprint(out_file, ast, options) - return True + + context.set_state('ok') def extract_translations(input_filename, language, context): context.log(f'Extracting translations from {input_filename}...') @@ -182,6 +185,40 @@ def extract_translations(input_filename, language, context): return pickle_safe_dumps(translator.dialogue), translator.strings +def worker(arg_tup): + args, filename = arg_tup + context = Context() + + try: + if args.write_translation_file: + result = extract_translations(filename, args.language, context) + else: + if args.translation_file is not None: + translator = translate.Translator(None) + translator.language, translator.dialogue, translator.strings = ( + pickle_loads(args.translations)) + else: + translator = None + result = decompile_rpyc( + filename, context, args.clobber, try_harder=args.try_harder, dump=args.dump, + no_pyexpr=args.no_pyexpr, comparable=args.comparable, translator=translator, + init_offset=args.init_offset, sl_custom_names=args.sl_custom_names + ) + + context.set_result(result) + + except BadRpycException: + context.set_state('spoofed') + context.log(f'Error while trying to read the header of {filename}:') + context.log(traceback.format_exc()) + except Exception: + context.set_state('fail') + context.log(f'Error while decompiling {filename}:') + context.log(traceback.format_exc()) + + return context + + def parse_sl_custom_names(unparsed_arguments): # parse a list of strings in the format # classname=name-nchildren into {classname: (name, nchildren)} @@ -216,35 +253,6 @@ def parse_sl_custom_names(unparsed_arguments): return parsed_arguments -def worker(arg_tup): - args, filename = arg_tup - context = Context() - - try: - if args.write_translation_file: - result = extract_translations(filename, args.language, context) - else: - if args.translation_file is not None: - translator = translate.Translator(None) - translator.language, translator.dialogue, translator.strings = ( - pickle_loads(args.translations)) - else: - translator = None - result = decompile_rpyc(filename, context, args.clobber, args.dump, - no_pyexpr=args.no_pyexpr, comparable=args.comparable, - translator=translator, init_offset=args.init_offset, - try_harder=args.try_harder, - sl_custom_names=args.sl_custom_names) - - context.set_result(result) - - except Exception as e: - context.set_error(e) - context.log(f'Error while decompiling {filename}:') - context.log(traceback.format_exc()) - - return context - def main(): if not sys.version_info[:2] >= (3, 9): @@ -363,6 +371,8 @@ def main(): version=f"{__title__} {__version__}") args = ap.parse_args() + # Basic start state + state_count = dict({'total': 0, 'ok': 0, 'fail': 0, 'skip': 0, 'spoofed': 0}) # Catch impossible arg combinations so they don't produce strange errors or fail silently if (args.no_pyexpr or args.comparable) and not args.dump: @@ -424,6 +434,7 @@ def traverse(inpath): if not worklist: print("Found no script files to decompile.") return + state_count['total'] = len(worklist) # If a big file starts near the end, there could be a long time with only one thread running, # which is inefficient. Avoid this by starting big files first. @@ -431,51 +442,55 @@ def traverse(inpath): worklist = [(args, x) for x in worklist] results = [] - if args.processes > 1 and len(worklist) > 5: with Pool(args.processes) as pool: for result in pool.imap(worker, worklist, 1): results.append(result) - - for line in result.log_contents: - print(line) - else: for result in map(worker, worklist): results.append(result) - for line in result.log_contents: - print(line) - if args.write_translation_file: print(f'Writing translations to {args.write_translation_file}...') translated_dialogue = {} translated_strings = {} - good = 0 - bad = 0 for result in results: if not result.value: - bad += 1 continue - good += 1 translated_dialogue.update(pickle_loads(result.value[0])) translated_strings.update(result.value[1]) with args.write_translation_file.open('wb') as out_file: pickle_safe_dump((args.language, translated_dialogue, translated_strings), out_file) - else: - # Check per file if everything went well and report back - good = sum(1 for i in results if i.error is None) - bad = sum(1 for i in results if i.error is not None) - - if bad == 0: - print(f'Decompilation of {good} script file{"s" if good > 1 else ""} successful') - elif good == 0: - print(f'Decompilation of {bad} file{"s" if bad > 1 else ""} failed') - else: - print(f'Decompilation of {good} file{"s" if good > 1 else ""} successful\n' - f'but decompilation of {bad} file{"s" if bad > 1 else ""} failed') - + # Get infos per instance and write them to their targets + for res_inst in results: + state_count[res_inst.state] += 1 + + for log_entry in res_inst.log_contents: + print(log_entry) + + def plural_fmt(inp): + """returns singular or plural of term file(s) contingent of input count""" + return f"{inp} file{'s'[:inp^1]}" + + endreport = ( + "\nThis decompile run of Unrpyc has the following outcome:\n" + f"{55 * '-'}\n" + f" A total of {plural_fmt(state_count['total'])} to decompile where found.\n" + f" > {plural_fmt(state_count['ok'])} could be successful decompiled.\n" + f" > {plural_fmt(state_count['fail'])} failed due to diverse errors.\n" + f" > {plural_fmt(state_count['spoofed'])} with wrong header or other manipulation.\n" + f" > {plural_fmt(state_count['skip'])} already exist and have been skipped.\n" + ) + # add pointers if we encounter problems + skipped = ("To overwrite existing files use option '--clobber'. " + if state_count['skip'] != 0 else "") + spoofed = ( + "In case of manipulation, the --try-harder option can be attempted." + if state_count['spoofed'] != 0 else "") + errors = ("Errors were found. Check the exceptions in the log for more info about this." + if state_count['fail'] != 0 else "") + print(endreport, skipped, spoofed, errors) if __name__ == '__main__': main()