Skip to content

Commit

Permalink
Merge pull request #27 from NucciTheBoss/cleaning_parallel
Browse files Browse the repository at this point in the history
Add support for cleaning stage plugins
  • Loading branch information
NucciTheBoss committed Jul 19, 2021
2 parents 2a3f493 + ffe4cd4 commit 277ea06
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 11 deletions.
137 changes: 130 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@
raise FileNotFoundError(Fore.RED + "The plugin {} is not found. Please verify that you are using the correct file path.".format(
clean_control["plot"][key]["plugin"]))

# Create plot directory to save plots
print_info("Creating directory to save plots.")
os.makedirs("data/plots", exist_ok=True)

print_info("Generating directive list for worker nodes.")
# Generate and slice directive list that will be sent out to the workers
clean_directive_list = sst.generate_clean(clean_control["plot"], ROOT_PATH + "/data/plots", ROOT_PATH + "/data")
Expand Down Expand Up @@ -603,7 +607,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -635,6 +638,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-1/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=1)

else:
Expand Down Expand Up @@ -812,7 +833,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -844,6 +864,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-2/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=2)

else:
Expand Down Expand Up @@ -1021,7 +1059,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -1053,6 +1090,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-3/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=3)

else:
Expand Down Expand Up @@ -1230,7 +1285,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -1262,6 +1316,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-4/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=4)

else:
Expand Down Expand Up @@ -1439,7 +1511,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -1471,6 +1542,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-5/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=5)

else:
Expand Down Expand Up @@ -1648,7 +1737,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -1680,6 +1768,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-6/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=6)

else:
Expand Down Expand Up @@ -1857,7 +1963,6 @@
except subprocess.SubprocessError:
logger.warning("ERROR: Evaluation for model {} failed. Please review logfile {} for error diagnostics.".format(model_name, file_output))


# Close the training plugin log file
fout.close()

Expand Down Expand Up @@ -1889,6 +1994,24 @@
logger.warning("INFO: Received task list {} from manager.".format(task_list))

if task_list != []:
logger.warning("INFO: Beginning cleaning stage plotting.")

for task in task_list:
logger.warning("INFO: Generating plot {}.".format(task[2]))
file_output = "data/.logs/worker-7/{}-plot-{}".format(TIME, task[2])
logger.warning("INFO: Saving output of plotting plugin to logfile {}.".format(file_output))
fout = open(file_output, "wt")

clean_param = clean_factory(task[1], task[2], task[3], ROOT_PATH)

try:
subprocess.run([PYTHON_PATH, task[0], "clean", clean_param], stdout=fout, stderr=fout)

except subprocess.SubprocessError:
logger.warning("ERROR: Plotting failed. Please review logfile {} for error diagnostics.".format(file_output))

fout.close()

comm.send(1, dest=0, tag=7)

else:
Expand Down
20 changes: 16 additions & 4 deletions utils/workerops/paramfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,28 @@ def attack_train_factory(adver_features: List[str], model_labels: np.ndarray,
return pickle_path


def clean_factory() -> str:
def clean_factory(models: List[str], plot_name: str, save_path: str, root_path: str) -> str:
"""
Generate parameter dictionary that will be sent out to the cleaning plugins for the cleaning stage.
Save as a pickle and return a file path reference to that pickle.
### Parameters:
- TODO
:param models: List of root model directories containing data for plots.
:param plot_name: Name to use for user-generated plot file.
:param save_path: System location save the adversarial examples.
:param root_path: Root directory of Jespipe.
### Returns:
:return: System file path reference to pickled parameter dictionary.
"""
# TODO: Update this function once you revisit the cleaning stage next week
pass
d = dict()

d["model_list"] = models
d["plot_name"] = plot_name
d["save_path"] = save_path

# Establish path to file in .tmp directory and dump dictionary
pickle_path = root_path + "/data/.tmp/" + str(uuid.uuid4()) + ".pkl"
joblib.dump(d, pickle_path)

return pickle_path

0 comments on commit 277ea06

Please sign in to comment.