Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: this refactors execution to properly use the process pool #352

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import itertools
from memory_profiler import profile
from time import time
from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping
from typing import Callable, Dict, Generator, List, Any, Tuple, Union, Sequence, Mapping
from tqdm.auto import tqdm

from cadCAD.utils import flatten
from cadCAD.utils import flatten, lazy_flatten
from cadCAD.utils.execution import print_exec_info
from cadCAD.configuration import Configuration, Processor
from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts
Expand Down Expand Up @@ -80,6 +82,7 @@ def __init__(self,
self.configs = configs
self.empty_return = empty_return

@profile
def execute(self) -> Tuple[object, object, Dict[str, object]]:
if self.empty_return is True:
return [], [], []
Expand Down Expand Up @@ -142,21 +145,44 @@ def get_final_dist_results(simulations: List[StateHistory],
psu, ep) for psu, ep in list(zip(psus, eps))]
return simulations, tensor_fields, sessions

def get_final_results_lazy(simulations: Generator,
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):
is_generator: bool = isinstance(simulations, Generator)
if is_generator == False:
raise ValueError(
'Invalid simulation results (Executor output is not a Generator required for lazy execution)')

tensor_fields = []
# NOTE here we change the result type to iterable
tensor_fields = itertools.chain.from_iterable(
map(create_tensor_field, zip(psus, eps)))

flat_simulations = map(
lazy_flatten, map(lazy_flatten, simulations))

# NOTE here we change the result type, which is now an iterable
iterable_flat_simulations = itertools.chain.from_iterable(
flat_simulations)

return iterable_flat_simulations, tensor_fields, sessions

def get_final_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):

# if list of lists of lists of dicts: do flatten
# if list of dicts: do not flatetn
# else raise error


init: bool = isinstance(simulations, Sequence)
failed_1 = False
failed_2 = False

try:
init: bool = isinstance(simulations, Sequence)
dont_flatten = init & isinstance(simulations[0], Mapping)
Expand All @@ -174,8 +200,8 @@ def get_final_results(simulations: List[StateHistory],
do_flatten = False

if failed_1 and failed_2:
raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])')

raise ValueError(
'Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])')

flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)),
Expand All @@ -184,7 +210,7 @@ def get_final_results(simulations: List[StateHistory],
if do_flatten:
flat_timesteps.append(flatten(sim_result))
tensor_fields.append(create_tensor_field(psu, ep))

if do_flatten:
flat_simulations = flatten(flat_timesteps)
else:
Expand All @@ -209,15 +235,19 @@ def get_final_results(simulations: List[StateHistory],
else:
raise ValueError("Invalid execution mode specified")


print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs
)

final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
if (self.additional_objs is not None and self.additional_objs.get('lazy_eval', False)):
final_result = get_final_results_lazy(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
else:
final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)

elif self.exec_context == ExecutionMode.distributed:
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
Expand All @@ -228,6 +258,6 @@ def get_final_results(simulations: List[StateHistory],
simulations_results, partial_state_updates, eps, sessions)

t2 = time()
print(f"Total execution time: {t2 - t1 :.2f}s")
print(f"Total execution time: {t2 - t1:.2f}s")

return final_result
137 changes: 84 additions & 53 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from typing import Callable, Dict, List, Any, Tuple, Sequence
from pathos.multiprocessing import ProcessPool # type: ignore
import os
from typing import Callable, Dict, Generator, List, Any, Tuple, Sequence
from pathos.multiprocessing import ProcessPool # type: ignore
from collections import Counter
from cadCAD.types import *
from cadCAD.utils import flatten
from cadCAD.utils import flatten, lazy_flatten
import tempfile
import pickle
import sys
from memory_profiler import profile
import dill

VarDictType = Dict[str, List[object]]
StatesListsType = List[dict[str, object]]
Expand All @@ -25,15 +31,14 @@ def single_proc_exec(
configured_n: Sequence[N_Runs],
additional_objs=None
) -> List:



if not isinstance(var_dict_list, Sequence):
var_dict_list = list([var_dict_list])

raw_params = (
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list)

results: List = []
print(f'Execution Mode: single_threaded')
for raw_param in zip(*raw_params):
Expand All @@ -44,6 +49,54 @@ def single_proc_exec(
results.append(flatten(result))
return flatten(results)


def process_executor(params):
zcstarr marked this conversation as resolved.
Show resolved Hide resolved
simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params

result = [simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)]
return result


def process_executor_disk(params):
simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params

result = [simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)]
temp_file = tempfile.NamedTemporaryFile(delete=False)
with open(temp_file.name, 'wb') as f: # Note 'wb' for binary writing mode
dill.dump(result, f)
return temp_file.name


@profile
def file_handler_inc(filenames: List[str]) -> Generator[List, None, None]:
# combined_results = []
for file_name in filenames:
with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode
result = dill.load(f)
yield result # Yield the loaded result for immediate processing

f.close()
os.remove(file_name) # Clean up temporary file


@profile
def file_handler(filenames: List[str]) -> Generator[List, None, None]:
combined_results = []
for file_name in filenames:
with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode
result = dill.load(f)
combined_results.append(result)
result = None
f.close()
os.remove(file_name) # Clean up temporary file
return combined_results


@profile
def parallelize_simulations(
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
Expand All @@ -61,50 +114,28 @@ def parallelize_simulations(
):

print(f'Execution Mode: parallelized')
params = list(
zip(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
)
)

len_configs_structs = len(configs_structs)

unique_runs = Counter(SimIDs)
sim_count = max(unique_runs.values())
highest_divisor = int(len_configs_structs / sim_count)

new_configs_structs, new_params = [], []
for count in range(len(params)):
if count == 0:
new_params.append(
params[count: highest_divisor]
)
new_configs_structs.append(
configs_structs[count: highest_divisor]
)
elif count > 0:
new_params.append(
params[count * highest_divisor: (count + 1) * highest_divisor]
)
new_configs_structs.append(
configs_structs[count * highest_divisor: (count + 1) * highest_divisor]
)

def process_executor(params):
if len_configs_structs > 1:
with ProcessPool(processes=len_configs_structs) as pp:
results = pp.map(
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params
)
else:
t = params[0]
results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n)
return results

results = flatten(list(map(lambda params: process_executor(params), new_params)))

return results
lazy_eval = False
if (additional_objs):
lazy_eval = additional_objs.get('lazy_eval', False)

params = [
(sim_exec, var_dict, states_list, config, env_processes,
T, sim_id, N, subset_id, subset_window, configured_n)
for sim_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window in
zip(simulation_execs, var_dict_list, states_lists, configs_structs,
env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows)
]

if (lazy_eval):
with ProcessPool(maxtasksperchild=1) as pool:
temp_files = pool.map(process_executor_disk, params)
generator = file_handler_inc(temp_files)
return lazy_flatten(generator)

with ProcessPool(maxtasksperchild=1) as pool:
results = pool.map(process_executor, params)

return flatten(results)


def local_simulations(
Expand All @@ -121,15 +152,15 @@ def local_simulations(
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs],
additional_objs=None
):
):
config_amt = len(configs_structs)

if config_amt == 1: # and configured_n != 1
if config_amt == 1: # and configured_n != 1
return single_proc_exec(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
)
elif config_amt > 1: # and configured_n != 1
elif config_amt > 1: # and configured_n != 1
return parallelize_simulations(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
Expand Down
6 changes: 5 additions & 1 deletion cadCAD/tools/execution/easy_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def easy_run(
drop_substeps=True,
exec_mode='local',
deepcopy_off=False,
lazy_eval=False
) -> pd.DataFrame:
"""
Run cadCAD simulations without headaches.
Expand All @@ -66,7 +67,10 @@ def easy_run(
_exec_mode = ExecutionMode().local_mode
elif exec_mode == 'single':
_exec_mode = ExecutionMode().single_mode
exec_context = ExecutionContext(_exec_mode, additional_objs={'deepcopy_off': deepcopy_off})
exec_context = ExecutionContext(_exec_mode, additional_objs={
'deepcopy_off': deepcopy_off,
'lazy_eval': lazy_eval
})
executor = Executor(exec_context=exec_context, configs=configs)

# Execute the cadCAD experiment
Expand Down
34 changes: 31 additions & 3 deletions cadCAD/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
from collections import defaultdict
from itertools import product
import warnings
from typing import Union
from typing import Any, Generator, Union
from cadCAD.types import *
from typing import List, Dict, Union

import functools
import operator

from pandas import DataFrame # type: ignore
from pandas import DataFrame # type: ignore


class SilentDF(DataFrame):
Expand All @@ -33,7 +33,8 @@ def arrange_cols(df: DataFrame, reverse=False) -> DataFrame:
"""
session_metrics = ['session_id', 'user_id', 'simulation_id', 'run_id']
sys_metrics = ['run', 'timestep', 'substep']
result_cols = list(set(df.columns) - set(session_metrics) - set(sys_metrics))
result_cols = list(set(df.columns) -
set(session_metrics) - set(sys_metrics))
result_cols.sort(reverse=reverse)
return df[session_metrics + sys_metrics + result_cols]

Expand Down Expand Up @@ -75,6 +76,7 @@ def tupalize(k: object, vs: Union[list, dict]):
l.append((k, vs))
return l


def flattenDict(l: dict) -> list:
"""
>>> flattenDict({1: [1, 2, 3], 4: 5})
Expand All @@ -92,6 +94,32 @@ def flatten(l: Union[list, dict]):
return flattenDict(l)


# Incremental version of flatten with type hints
def lazy_tupalize(k: Any, vs: Union[Iterable[Any], Any]) -> Generator[tuple, None, None]:
if isinstance(vs, Iterable) and not isinstance(vs, str):
for v in vs:
yield (k, v)
else:
yield (k, vs)


def lazy_flattenDict(d: Dict[Any, Any]) -> Generator[Dict[Any, Any], None, None]:
flat_list = (lazy_tupalize(k, vs) for k, vs in d.items())
for items in product(*flat_list):
yield dict(items)


def lazy_flatten(l: Union[Iterable[Any], Dict[Any, Any]]) -> Generator[Any, None, None]:
if isinstance(l, Iterable) and not isinstance(l, (str, dict)):
for item in l:
if isinstance(item, Iterable) and not isinstance(item, (str, dict)):
yield from lazy_flatten(item)
else:
yield item
elif isinstance(l, dict):
yield from lazy_flattenDict(l)


def flatMap(f, collection):
return flatten(list(map(f, collection)))

Expand Down
Loading