Skip to content

Commit

Permalink
Some rewrites of the new output logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
CensoredUsername committed Apr 27, 2024
1 parent 31c30d0 commit 6cc0633
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 57 deletions.
2 changes: 0 additions & 2 deletions deobfuscate.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ def read_ast(f, context):

if not raw_datas:
diagnosis.append("All strategies failed. Unable to extract data")
context.set_state('spoofed')
raise ValueError("\n".join(diagnosis))

if len(raw_datas) != 1:
Expand All @@ -312,7 +311,6 @@ def read_ast(f, context):
return stmts

diagnosis.append("All strategies failed. Unable to deobfuscate data")
context.set_state('spoofed')
raise ValueError("\n".join(diagnosis))


Expand Down
154 changes: 99 additions & 55 deletions unrpyc.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,35 @@ def cpu_count():

class Context:
def __init__(self):
# list of log lines to print
self.log_contents = []
self.state = None

# any exception that occurred
self.error = None

# state of what case was encountered
# options:
# error: (default) an unexpected exception was raised
# ok: the process concluded successfully
# bad_header: the given file cannot be parsed as a normal rpyc file
# skip: the given file was skipped due to a preexisting output file
self.state = "error"

# return value from the worker, if any
self.value = None

def log(self, message):
self.log_contents.append(message)

def set_state(self, state):
self.state = state
def set_error(self, error):
self.error = error

def set_result(self, value):
self.value = value

def set_state(self, state):
self.state = state


class BadRpycException(Exception):
"""Exception raised when we couldn't parse the rpyc archive format"""
Expand All @@ -76,7 +92,7 @@ def read_ast_from_file(in_file, context):
# v1 files are just a zlib compressed pickle blob containing some data and the ast
# v2 files contain a basic archive structure that can be parsed to find the same blob
raw_contents = in_file.read()
l1_start = raw_contents[:50]
file_start = raw_contents[:50]
is_rpyc_v1 = False

if not raw_contents.startswith(b"RENPY RPC2"):
Expand Down Expand Up @@ -108,19 +124,20 @@ def read_ast_from_file(in_file, context):
chunks[slot] = raw_contents[start: start + length]

if 1 not in chunks:
context.set_state('bad_header')
raise BadRpycException(
"Unable to find the right slot to load from the rpyc file. The file header "
"structure has been changed."
f"File header:{l1_start}")
f"structure has been changed. File header: {file_start}")

contents = chunks[1]

try:
contents = zlib.decompress(contents)
except Exception:
context.set_state('bad_header')
raise BadRpycException(
"Did not find a zlib compressed blob where it was expected. Either the header has been "
"modified or the file structure has been changed.") from None
f"modified or the file structure has been changed. File header: {file_start}") from None

# add some detection of ren'py 7 files
if is_rpyc_v1 or pickle_detect_python2(contents):
Expand Down Expand Up @@ -149,12 +166,13 @@ def decompile_rpyc(input_filename, context, overwrite=False, try_harder=False, d
ext = '.rpym'
out_filename = input_filename.with_suffix(ext)

context.log(f'Decompiling {input_filename} to {out_filename.name}...')

if not overwrite and out_filename.exists():
context.log("Target file exists already! Skipping.")
context.log(f'Skipping {input_filename}. {out_filename.name} already exists.')
context.set_state('skip')
return # Don't stop decompiling if a file already exists
return

context.log(f'Decompiling {input_filename} to {out_filename.name} ...')

with input_filename.open('rb') as in_file:
if try_harder:
Expand Down Expand Up @@ -192,27 +210,24 @@ def worker(arg_tup):
try:
if args.write_translation_file:
result = extract_translations(filename, args.language, context)
context.set_result(result)

else:
if args.translation_file is not None:
translator = translate.Translator(None)
translator.language, translator.dialogue, translator.strings = (
pickle_loads(args.translations))
else:
translator = None
result = decompile_rpyc(

decompile_rpyc(
filename, context, args.clobber, try_harder=args.try_harder, dump=args.dump,
no_pyexpr=args.no_pyexpr, comparable=args.comparable, translator=translator,
init_offset=args.init_offset, sl_custom_names=args.sl_custom_names
)

context.set_result(result)

except BadRpycException:
context.set_state('spoofed')
context.log(f'Error while trying to read the header of {filename}:')
context.log(traceback.format_exc())
except Exception:
context.set_state('fail')
except Exception as e:
context.set_error(e)
context.log(f'Error while decompiling {filename}:')
context.log(traceback.format_exc())

Expand Down Expand Up @@ -254,6 +269,11 @@ def parse_sl_custom_names(unparsed_arguments):
return parsed_arguments


def plural_s(n, unit):
"""Correctly uses the plural form of 'unit' when 'n' is not one"""
return f"1 {unit}" if n == 1 else f"{n} {unit}s"


def main():
if not sys.version_info[:2] >= (3, 9):
raise Exception(
Expand Down Expand Up @@ -371,25 +391,23 @@ def main():
version=f"{__title__} {__version__}")

args = ap.parse_args()
# Basic start state
state_count = dict({'total': 0, 'ok': 0, 'fail': 0, 'skip': 0, 'spoofed': 0})

# Catch impossible arg combinations so they don't produce strange errors or fail silently
if (args.no_pyexpr or args.comparable) and not args.dump:
raise ap.error(
ap.error(
"Arguments 'comparable' and 'no_pyexpr' are not usable without 'dump'.")

if ((args.try_harder or args.dump)
and (args.write_translation_file or args.translation_file or args.language)):
raise ap.error(
ap.error(
"Arguments 'try_harder' and/or 'dump' are not usable with the translation "
"feature.")

# Fail early to avoid wasting time going through the files
if (args.write_translation_file
and not args.clobber
and args.write_translation_file.exists()):
raise ap.error(
ap.error(
"Output translation file already exists. Pass --clobber to overwrite.")

if args.translation_file:
Expand Down Expand Up @@ -434,22 +452,38 @@ def traverse(inpath):
if not worklist:
print("Found no script files to decompile.")
return
state_count['total'] = len(worklist)

if args.processes > len(worklist):
args.processes = len(worklist)

print(f"Found {plural_s(len(worklist), 'file')} to process. "
f"Performing decompilation using {plural_s(args.processes, 'worker')}.")

# If a big file starts near the end, there could be a long time with only one thread running,
# which is inefficient. Avoid this by starting big files first.
worklist.sort(key=lambda x: x.stat().st_size, reverse=True)
worklist = [(args, x) for x in worklist]

results = []
if args.processes > 1 and len(worklist) > 5:
if args.processes > 1:
with Pool(args.processes) as pool:
for result in pool.imap(worker, worklist, 1):
results.append(result)

for line in result.log_contents:
print(line)

print("")

else:
for result in map(worker, worklist):
results.append(result)

for line in result.log_contents:
print(line)

print("")

if args.write_translation_file:
print(f'Writing translations to {args.write_translation_file}...')
translated_dialogue = {}
Expand All @@ -462,35 +496,45 @@ def traverse(inpath):
with args.write_translation_file.open('wb') as out_file:
pickle_safe_dump((args.language, translated_dialogue, translated_strings), out_file)

# Get infos per instance and write them to their targets
for res_inst in results:
state_count[res_inst.state] += 1

for log_entry in res_inst.log_contents:
print(log_entry)

def plural_fmt(inp):
"""returns singular or plural of term file(s) contingent of input count"""
return f"{inp} file{'s'[:inp^1]}"

endreport = (
"\nThis decompile run of Unrpyc has the following outcome:\n"
f"{55 * '-'}\n"
f" A total of {plural_fmt(state_count['total'])} to decompile where found.\n"
f" > {plural_fmt(state_count['ok'])} could be successful decompiled.\n"
f" > {plural_fmt(state_count['fail'])} failed due to diverse errors.\n"
f" > {plural_fmt(state_count['spoofed'])} with wrong header or other manipulation.\n"
f" > {plural_fmt(state_count['skip'])} already exist and have been skipped.\n"
)
# add pointers if we encounter problems
skipped = ("To overwrite existing files use option '--clobber'. "
if state_count['skip'] != 0 else "")
spoofed = (
"In case of manipulation, the --try-harder option can be attempted."
if state_count['spoofed'] != 0 else "")
errors = ("Errors were found. Check the exceptions in the log for more info about this."
if state_count['fail'] != 0 else "")
print(endreport, skipped, spoofed, errors)
success = sum(result.state == "ok" for result in results)
skipped = sum(result.state == "skip" for result in results)
failed = sum(result.state == "error" for result in results)
broken = sum(result.state == "bad_header" for result in results)

print("")
print(f"{55 * '-'}")
print(f"{__title__} {__version__} results summary:")
print(f"{55 * '-'}")
print(f"Processed {plural_s(len(results), 'file')}.")

if args.write_translation_file:
print(f"> {plural_s(success, 'file')} were successfully analyzed.")
else:
print(f"> {plural_s(success, 'file')} were successfully decompiled.")

if broken:
print(f"> {plural_s(broken, 'file')} did not have the correct header, "
"these were ignored.")

if failed:
print(f"> {plural_s(failed, 'file')} failed to decompile due to errors.")

if skipped:
print(f"> {plural_s(skipped, 'file')} were skipped as the output file already existed.")


if skipped:
print("")
print("To overwrite existing files instead of skipping them, use the --clobber flag.")

if broken:
print("")
print("To attempt to bypass modifications to the file header, use the --try-harder flag.")

if failed:
print("")
print("Errors were encountered during decompilation. Check the log for more information.")
print("When making a bug report, please include this entire log.")

if __name__ == '__main__':
main()

0 comments on commit 6cc0633

Please sign in to comment.