Skip to content

Commit

Permalink
Merge pull request #140 from optimas-org/feature/allow_local_threading
Browse files Browse the repository at this point in the history
Enable local threading communications
  • Loading branch information
MaxThevenet authored Nov 9, 2023
2 parents 1e8c782 + 0ec1fd5 commit 8fcaa93
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 27 deletions.
21 changes: 3 additions & 18 deletions optimas/evaluators/function_evaluator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Contains the definition of the FunctionEvaluator class."""

from typing import Callable, Optional, Dict, List
from typing import Callable, Dict, List

from optimas.sim_functions import run_function
from optimas.core import VaryingParameter, Objective, Parameter
Expand All @@ -14,26 +14,11 @@ class FunctionEvaluator(Evaluator):
----------
function : callable
The function to be evaluated.
n_procs : int, optional
The number of processes that will be used for each evaluation. By
default, ``n_procs=1`` if ``n_gpus`` is not given. Otherwise, the
default behavior is to match the number of processes to the number
of GPUs, i.e., ``n_procs=n_gpus``.
n_gpus : int, optional
The number of GPUs that will be made available for each evaluation. By
default, 0.
"""

def __init__(
self,
function: Callable,
n_procs: Optional[int] = None,
n_gpus: Optional[int] = None,
) -> None:
super().__init__(
sim_function=run_function, n_procs=n_procs, n_gpus=n_gpus
)
def __init__(self, function: Callable) -> None:
super().__init__(sim_function=run_function)
self.function = function

def get_sim_specs(
Expand Down
34 changes: 25 additions & 9 deletions optimas/explorations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
import glob
from typing import Optional, Union, Dict, List
from typing import Optional, Union, Dict, List, Literal

import numpy as np
import pandas as pd
Expand All @@ -15,6 +15,7 @@

from optimas.generators.base import Generator
from optimas.evaluators.base import Evaluator
from optimas.evaluators.function_evaluator import FunctionEvaluator
from optimas.utils.logger import get_logger
from optimas.utils.other import convert_to_dataframe

Expand Down Expand Up @@ -60,12 +61,17 @@ class Exploration:
There is no need to provide the `history` path (it will be ignored).
If `False` (default value), the exploration will raise an error if
the `exploration_dir_path` already exists.
libe_comms : {'local', 'mpi'}, optional.
libe_comms : {'local', 'local_threading', 'mpi'}, optional.
The communication mode for libEnseble. Determines whether to use
Python ``multiprocessing`` (local mode) or MPI for the communication
between the manager and workers. If running in ``'mpi'`` mode, the
Optimas script should be launched with ``mpirun`` or equivalent, for
example, ``mpirun -np N python myscript.py``. This will launch one
Python ``multiprocessing`` (local), ``threading`` (local_threading)
or MPI for the communication between the manager and workers.
The ``'local_threading'`` mode is only recommended when running in a
Jupyter notebook if the default 'local' mode has issues (this
can happen especially on Windows and Mac, which use multiprocessing
``spawn``). ``'local_threading'`` only supports ``FunctionEvaluator``s.
If running in ``'mpi'`` mode, the Optimas script should be launched
with ``mpirun`` or equivalent, for example,
``mpirun -np N python myscript.py``. This will launch one
manager and ``N-1`` simulation workers. In this case, the
``sim_workers`` parameter is ignored. By default, ``'local'`` mode
is used.
Expand All @@ -83,8 +89,17 @@ def __init__(
history_save_period: Optional[int] = None,
exploration_dir_path: Optional[str] = "./exploration",
resume: Optional[bool] = False,
libe_comms: Optional[str] = "local",
libe_comms: Optional[
Literal["local", "local_threading", "mpi"]
] = "local",
) -> None:
if libe_comms == "local_threading" and not isinstance(
evaluator, FunctionEvaluator
):
raise ValueError(
"'local_threading' mode is only supported when using a "
"`FunctionEvaluator`. Use 'local' mode instead."
)
self.generator = generator
self.evaluator = evaluator
self.max_evals = max_evals
Expand Down Expand Up @@ -480,7 +495,7 @@ def _set_default_libe_specs(self) -> None:
libE_specs["dedicated_mode"] = False
# Set communications and corresponding number of workers.
libE_specs["comms"] = self.libe_comms
if self.libe_comms == "local":
if self.libe_comms in ["local", "local_threading"]:
libE_specs["nworkers"] = self.sim_workers + 1
elif self.libe_comms == "mpi":
# Warn user if openmpi is being used.
Expand All @@ -499,7 +514,8 @@ def _set_default_libe_specs(self) -> None:
else:
raise ValueError(
"Communication mode '{}'".format(self.libe_comms)
+ " not recognized. Possible values are 'local' or 'mpi'."
+ " not recognized. Possible values are 'local', "
+ "'local_threading' or 'mpi'."
)
# Set exploration directory path.
libE_specs["ensemble_dir_path"] = "evaluations"
Expand Down
54 changes: 54 additions & 0 deletions tests/test_comms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import numpy as np

from optimas.explorations import Exploration
from optimas.generators import RandomSamplingGenerator
from optimas.evaluators import FunctionEvaluator
from optimas.core import VaryingParameter, Objective


def eval_func(input_params, output_params):
"""Evaluation function used for testing"""
x0 = input_params["x0"]
x1 = input_params["x1"]
result = -(x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1))
output_params["f"] = result


def test_libe_comms():
"""Test local and local_threading communications."""
# Define variables and objectives.
var1 = VaryingParameter("x0", -50.0, 5.0)
var2 = VaryingParameter("x1", -5.0, 15.0)
obj = Objective("f", minimize=False)

max_evals = 10

for comm in ["local", "local_threading"]:
# Create generator.
gen = RandomSamplingGenerator(
varying_parameters=[var1, var2], objectives=[obj]
)

# Create function evaluator.
ev = FunctionEvaluator(function=eval_func)

# Create exploration.
exploration = Exploration(
generator=gen,
evaluator=ev,
max_evals=max_evals,
sim_workers=2,
exploration_dir_path=f"./tests_output/test_comms_{comm}",
libe_comms=comm,
)

# Run exploration.
exploration.run()

# Check that all trials were evaluated.
assert np.all(exploration.history["f"] != 0.0)
assert len(exploration.history) == max_evals


if __name__ == "__main__":
test_libe_comms()

0 comments on commit 8fcaa93

Please sign in to comment.