diff --git a/main.py b/main.py index d2745c1..867658d 100644 --- a/main.py +++ b/main.py @@ -346,6 +346,10 @@ raise FileNotFoundError(Fore.RED + "The plugin {} is not found. Please verify that you are using the correct file path.".format( clean_control["plot"][key]["plugin"])) + # Create plot directory to save plots + print_info("Creating directory to save plots.") + os.makedirs("data/plots", exist_ok=True) + print_info("Generating directive list for worker nodes.") # Generate and slice directive list that will be sent out to the workers clean_directive_list = sst.generate_clean(clean_control["plot"], ROOT_PATH + "/data/plots", ROOT_PATH + "/data") @@ -603,7 +607,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -635,6 +638,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-1/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=1) else: @@ -812,7 +833,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -844,6 +864,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-2/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=2) else: @@ -1021,7 +1059,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -1053,6 +1090,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-3/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=3) else: @@ -1230,7 +1285,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -1262,6 +1316,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-4/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=4) else: @@ -1439,7 +1511,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -1471,6 +1542,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-5/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=5) else: @@ -1648,7 +1737,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -1680,6 +1768,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-6/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=6) else: @@ -1857,7 +1963,6 @@ except subprocess.SubprocessError: logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output)) - # Close the training plugin log file fout.close() @@ -1889,6 +1994,24 @@ logger.warning("INFO: Received task list {} from manager.".format(task_list)) if task_list != []: + logger.warning("INFO: Beginning cleaning stage plotting.") + + for task in task_list: + logger.warning("INFO: Generating plot {}.".format(task[2])) + file_output = "data/.logs/worker-7/{}-plot-{}".format(TIME, task[2]) + logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output)) + fout = open(file_output, "wt") + + clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH) + + try: + subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout) + + except subprocess.SubprocessError: + logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output)) + + fout.close() + comm.send(1, dest=0, tag=7) else: diff --git a/utils/workerops/paramfactory.py b/utils/workerops/paramfactory.py index 5761ca6..692aa28 100644 --- a/utils/workerops/paramfactory.py +++ b/utils/workerops/paramfactory.py @@ -142,16 +142,28 @@ def attack_train_factory(adver_features: List[str], model_labels: np.ndarray, return pickle_path -def clean_factory() -> str: +def clean_factory(models: List[str], plot_name: str, save_path: str, root_path: str) -> str: """ Generate parameter dictionary that will be sent out to the cleaning plugins for the cleaning stage. Save as a pickle and return a file path reference to that pickle. ### Parameters: - - TODO + :param models: List of root model directories containing data for plots. + :param plot_name: Name to use for user-generated plot file. + :param save_path: System location save the adversarial examples. + :param root_path: Root directory of Jespipe. ### Returns: :return: System file path reference to pickled parameter dictionary. """ - # TODO: Update this function once you revisit the cleaning stage next week - pass + d = dict() + + d["model_list"] = models + d["plot_name"] = plot_name + d["save_path"] = save_path + + # Establish path to file in .tmp directory and dump dictionary + pickle_path = root_path + "/data/.tmp/" + str(uuid.uuid4()) + ".pkl" + joblib.dump(d, pickle_path) + + return pickle_path